jj_lib/
stacked_table.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A persistent table of fixed-size keys to variable-size values.
16//!
17//! The keys are stored in sorted order, with each key followed by an
18//! integer offset into the list of values. The values are
19//! concatenated after the keys. A file may have a parent file, and
20//! the parent may have its own parent, and so on. The child file then
21//! represents the union of the entries.
22
23#![expect(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::collections::HashSet;
29use std::fs;
30use std::fs::File;
31use std::io;
32use std::io::Read;
33use std::io::Write as _;
34use std::iter;
35use std::path::PathBuf;
36use std::sync::Arc;
37use std::sync::RwLock;
38use std::time::SystemTime;
39
40use blake2::Blake2b512;
41use blake2::Digest as _;
42use tempfile::NamedTempFile;
43use thiserror::Error;
44
45use crate::file_util::IoResultExt as _;
46use crate::file_util::PathError;
47use crate::file_util::persist_content_addressed_temp_file;
48use crate::hex_util;
49use crate::lock::FileLock;
50use crate::lock::FileLockError;
51
52// BLAKE2b-512 hash length in hex string
53const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
54
55pub trait TableSegment {
56    fn segment_num_entries(&self) -> usize;
57    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
58    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
59    fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
60
61    fn num_entries(&self) -> usize {
62        if let Some(parent_file) = self.segment_parent_file() {
63            parent_file.num_entries() + self.segment_num_entries()
64        } else {
65            self.segment_num_entries()
66        }
67    }
68
69    fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
70        self.segment_get_value(key)
71            .or_else(|| self.segment_parent_file()?.get_value(key))
72    }
73}
74
75pub struct ReadonlyTable {
76    key_size: usize,
77    parent_file: Option<Arc<Self>>,
78    name: String,
79    // Number of entries not counting the parent file
80    num_local_entries: usize,
81    // The file's entries in the raw format they're stored in on disk.
82    index: Vec<u8>,
83    values: Vec<u8>,
84}
85
86impl ReadonlyTable {
87    fn load_from(
88        file: &mut dyn Read,
89        store: &TableStore,
90        name: String,
91        key_size: usize,
92    ) -> TableStoreResult<Arc<Self>> {
93        let to_load_err = |err| TableStoreError::LoadSegment {
94            name: name.clone(),
95            err,
96        };
97        let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
98            let mut buf = [0; 4];
99            file.read_exact(&mut buf).map_err(to_load_err)?;
100            Ok(u32::from_le_bytes(buf))
101        };
102        let parent_filename_len = read_u32(file)?;
103        let maybe_parent_file = if parent_filename_len > 0 {
104            let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
105            file.read_exact(&mut parent_filename_bytes)
106                .map_err(to_load_err)?;
107            let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
108            let parent_file = store.load_table(parent_filename)?;
109            Some(parent_file)
110        } else {
111            None
112        };
113        let num_local_entries = read_u32(file)? as usize;
114        let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
115        let mut data = vec![];
116        file.read_to_end(&mut data).map_err(to_load_err)?;
117        let values = data.split_off(index_size);
118        let index = data;
119        Ok(Arc::new(Self {
120            key_size,
121            parent_file: maybe_parent_file,
122            name,
123            num_local_entries,
124            index,
125            values,
126        }))
127    }
128
129    pub fn name(&self) -> &str {
130        &self.name
131    }
132
133    /// Iterates ancestor table segments including `self`.
134    pub fn ancestor_segments(self: &Arc<Self>) -> impl Iterator<Item = &Arc<Self>> {
135        iter::successors(Some(self), |table| table.segment_parent_file())
136    }
137
138    pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
139        MutableTable::incremental(self.clone())
140    }
141
142    fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
143        if pos == self.num_local_entries {
144            self.values.len()
145        } else {
146            ReadonlyTableIndexEntry::new(self, pos).value_offset()
147        }
148    }
149
150    fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
151        &self.values
152            [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
153    }
154}
155
156impl TableSegment for ReadonlyTable {
157    fn segment_num_entries(&self) -> usize {
158        self.num_local_entries
159    }
160
161    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
162        self.parent_file.as_ref()
163    }
164
165    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
166        let mut low_pos = 0;
167        let mut high_pos = self.num_local_entries;
168        loop {
169            if high_pos == low_pos {
170                return None;
171            }
172            let mid_pos = (low_pos + high_pos) / 2;
173            let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
174            match key.cmp(mid_entry.key()) {
175                Ordering::Less => {
176                    high_pos = mid_pos;
177                }
178                Ordering::Equal => {
179                    return Some(self.segment_value_by_pos(mid_pos));
180                }
181                Ordering::Greater => {
182                    low_pos = mid_pos + 1;
183                }
184            }
185        }
186    }
187
188    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
189        for pos in 0..self.num_local_entries {
190            let entry = ReadonlyTableIndexEntry::new(self, pos);
191            mut_table.add_entry(
192                entry.key().to_vec(),
193                self.segment_value_by_pos(pos).to_vec(),
194            );
195        }
196    }
197}
198
199struct ReadonlyTableIndexEntry<'table> {
200    data: &'table [u8],
201}
202
203impl<'table> ReadonlyTableIndexEntry<'table> {
204    fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
205        let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
206        let offset = entry_size * pos;
207        let data = &table.index[offset..][..entry_size];
208        Self { data }
209    }
210
211    fn size(key_size: usize) -> usize {
212        key_size + 4
213    }
214
215    fn key(&self) -> &'table [u8] {
216        &self.data[0..self.data.len() - 4]
217    }
218
219    fn value_offset(&self) -> usize {
220        u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
221    }
222}
223
224pub struct MutableTable {
225    key_size: usize,
226    parent_file: Option<Arc<ReadonlyTable>>,
227    entries: BTreeMap<Vec<u8>, Vec<u8>>,
228}
229
230impl MutableTable {
231    fn full(key_size: usize) -> Self {
232        Self {
233            key_size,
234            parent_file: None,
235            entries: BTreeMap::new(),
236        }
237    }
238
239    fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
240        let key_size = parent_file.key_size;
241        Self {
242            key_size,
243            parent_file: Some(parent_file),
244            entries: BTreeMap::new(),
245        }
246    }
247
248    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
249        assert_eq!(key.len(), self.key_size);
250        self.entries.insert(key, value);
251    }
252
253    fn add_entries_from(&mut self, other: &dyn TableSegment) {
254        other.segment_add_entries_to(self);
255    }
256
257    fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
258        let mut maybe_own_ancestor = self.parent_file.clone();
259        let mut maybe_other_ancestor = Some(other.clone());
260        let mut files_to_add = vec![];
261        loop {
262            if maybe_other_ancestor.is_none() {
263                break;
264            }
265            let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
266            if maybe_own_ancestor.is_none() {
267                files_to_add.push(other_ancestor.clone());
268                maybe_other_ancestor = other_ancestor.parent_file.clone();
269                continue;
270            }
271            let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
272            if own_ancestor.name == other_ancestor.name {
273                break;
274            }
275            if own_ancestor.num_entries() < other_ancestor.num_entries() {
276                files_to_add.push(other_ancestor.clone());
277                maybe_other_ancestor = other_ancestor.parent_file.clone();
278            } else {
279                maybe_own_ancestor = own_ancestor.parent_file.clone();
280            }
281        }
282
283        for file in files_to_add.iter().rev() {
284            self.add_entries_from(file.as_ref());
285        }
286    }
287
288    fn serialize(self) -> Vec<u8> {
289        let mut buf = vec![];
290
291        if let Some(parent_file) = &self.parent_file {
292            buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
293            buf.extend_from_slice(parent_file.name.as_bytes());
294        } else {
295            buf.extend(0_u32.to_le_bytes());
296        }
297
298        buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
299
300        let mut value_offset = 0_u32;
301        for (key, value) in &self.entries {
302            buf.extend_from_slice(key);
303            buf.extend(value_offset.to_le_bytes());
304            value_offset += u32::try_from(value.len()).unwrap();
305        }
306        for value in self.entries.values() {
307            buf.extend_from_slice(value);
308        }
309        buf
310    }
311
312    /// If the MutableTable has more than half the entries of its parent
313    /// ReadonlyTable, return MutableTable with the commits from both. This
314    /// is done recursively, so the stack of index files has O(log n) files.
315    #[expect(clippy::assigning_clones)]
316    fn maybe_squash_with_ancestors(self) -> Self {
317        let mut num_new_entries = self.entries.len();
318        let mut files_to_squash = vec![];
319        let mut maybe_parent_file = self.parent_file.clone();
320        let mut squashed;
321        loop {
322            match maybe_parent_file {
323                Some(parent_file) => {
324                    // TODO: We should probably also squash if the parent file has less than N
325                    // commits, regardless of how many (few) are in `self`.
326                    if 2 * num_new_entries < parent_file.num_local_entries {
327                        squashed = Self::incremental(parent_file);
328                        break;
329                    }
330                    num_new_entries += parent_file.num_local_entries;
331                    files_to_squash.push(parent_file.clone());
332                    maybe_parent_file = parent_file.parent_file.clone();
333                }
334                None => {
335                    squashed = Self::full(self.key_size);
336                    break;
337                }
338            }
339        }
340
341        if files_to_squash.is_empty() {
342            return self;
343        }
344
345        for parent_file in files_to_squash.iter().rev() {
346            squashed.add_entries_from(parent_file.as_ref());
347        }
348        squashed.add_entries_from(&self);
349        squashed
350    }
351
352    fn save_in(mut self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
353        if self.entries.is_empty()
354            && let Some(parent_file) = self.parent_file.take()
355        {
356            return Ok(parent_file);
357        }
358
359        let buf = self.maybe_squash_with_ancestors().serialize();
360        let mut hasher = Blake2b512::new();
361        hasher.update(&buf);
362        let file_id_hex = hex_util::encode_hex(&hasher.finalize());
363        let file_path = store.dir.join(&file_id_hex);
364        let to_save_err = |err| TableStoreError::SaveSegment {
365            name: file_id_hex.clone(),
366            err,
367        };
368
369        let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
370        let file = temp_file.as_file_mut();
371        file.write_all(&buf).map_err(to_save_err)?;
372        persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
373
374        ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
375    }
376}
377
378impl TableSegment for MutableTable {
379    fn segment_num_entries(&self) -> usize {
380        self.entries.len()
381    }
382
383    fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
384        self.parent_file.as_ref()
385    }
386
387    fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
388        self.entries.get(key).map(Vec::as_slice)
389    }
390
391    fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
392        for (key, value) in &self.entries {
393            mut_table.add_entry(key.clone(), value.clone());
394        }
395    }
396}
397
398#[derive(Debug, Error)]
399pub enum TableStoreError {
400    #[error("Failed to load table heads")]
401    LoadHeads(#[source] io::Error),
402    #[error("Failed to save table heads")]
403    SaveHeads(#[source] io::Error),
404    #[error("Failed to load table segment '{name}'")]
405    LoadSegment {
406        name: String,
407        #[source]
408        err: io::Error,
409    },
410    #[error("Failed to save table segment '{name}'")]
411    SaveSegment {
412        name: String,
413        #[source]
414        err: io::Error,
415    },
416    #[error("Failed to lock table store")]
417    Lock(#[source] FileLockError),
418}
419
420pub type TableStoreResult<T> = Result<T, TableStoreError>;
421
422pub struct TableStore {
423    dir: PathBuf,
424    key_size: usize,
425    cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
426}
427
428impl TableStore {
429    pub fn init(dir: PathBuf, key_size: usize) -> Self {
430        std::fs::create_dir(dir.join("heads")).unwrap();
431        Self {
432            dir,
433            key_size,
434            cached_tables: Default::default(),
435        }
436    }
437
438    pub fn reinit(&self) {
439        std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
440        Self::init(self.dir.clone(), self.key_size);
441    }
442
443    pub fn key_size(&self) -> usize {
444        self.key_size
445    }
446
447    pub fn load(dir: PathBuf, key_size: usize) -> Self {
448        Self {
449            dir,
450            key_size,
451            cached_tables: Default::default(),
452        }
453    }
454
455    pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
456        let maybe_parent_table = mut_table.parent_file.clone();
457        let table = mut_table.save_in(self)?;
458        self.add_head(&table)?;
459        if let Some(parent_table) = maybe_parent_table
460            && parent_table.name != table.name
461        {
462            self.remove_head(&parent_table);
463        }
464        {
465            let mut locked_cache = self.cached_tables.write().unwrap();
466            locked_cache.insert(table.name.clone(), table.clone());
467        }
468        Ok(table)
469    }
470
471    fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
472        std::fs::write(self.dir.join("heads").join(&table.name), "")
473            .map_err(TableStoreError::SaveHeads)
474    }
475
476    fn remove_head(&self, table: &Arc<ReadonlyTable>) {
477        // It's fine if the old head was not found. It probably means
478        // that we're on a distributed file system where the locking
479        // doesn't work. We'll probably end up with two current
480        // heads. We'll detect that next time we load the table.
481        std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
482    }
483
484    fn lock(&self) -> TableStoreResult<FileLock> {
485        FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
486    }
487
488    fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
489        {
490            let read_locked_cached = self.cached_tables.read().unwrap();
491            if let Some(table) = read_locked_cached.get(&name).cloned() {
492                return Ok(table);
493            }
494        }
495        let to_load_err = |err| TableStoreError::LoadSegment {
496            name: name.clone(),
497            err,
498        };
499        let table_file_path = self.dir.join(&name);
500        let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
501        let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
502        {
503            let mut write_locked_cache = self.cached_tables.write().unwrap();
504            write_locked_cache.insert(table.name.clone(), table.clone());
505        }
506        Ok(table)
507    }
508
509    fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
510        let mut tables = vec![];
511        for head_entry in
512            std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
513        {
514            let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
515            let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
516            tables.push(table);
517        }
518        Ok(tables)
519    }
520
521    pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
522        let mut tables = self.get_head_tables()?;
523
524        if tables.is_empty() {
525            let empty_table = MutableTable::full(self.key_size);
526            self.save_table(empty_table)
527        } else if tables.len() == 1 {
528            Ok(tables.pop().unwrap())
529        } else {
530            // There are multiple heads. We take a lock, then check if there are still
531            // multiple heads (it's likely that another process was in the process of
532            // deleting on of them). If there are still multiple heads, we attempt to
533            // merge all the tables into one. We then save that table and record the new
534            // head. Note that the locking isn't necessary for correctness; we
535            // take the lock only to avoid other concurrent processes from doing
536            // the same work (and producing another set of divergent heads).
537            let (table, _) = self.get_head_locked()?;
538            Ok(table)
539        }
540    }
541
542    pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
543        let lock = self.lock()?;
544        let mut tables = self.get_head_tables()?;
545
546        if tables.is_empty() {
547            let empty_table = MutableTable::full(self.key_size);
548            let table = self.save_table(empty_table)?;
549            return Ok((table, lock));
550        }
551
552        if tables.len() == 1 {
553            // Return early so we don't write a table with no changes compared to its parent
554            return Ok((tables.pop().unwrap(), lock));
555        }
556
557        let mut merged_table = MutableTable::incremental(tables[0].clone());
558        for other in &tables[1..] {
559            merged_table.merge_in(other);
560        }
561        let merged_table = self.save_table(merged_table)?;
562        for table in &tables[1..] {
563            self.remove_head(table);
564        }
565        Ok((merged_table, lock))
566    }
567
568    /// Prunes unreachable table segments.
569    ///
570    /// All table segments reachable from the `head` won't be removed. In
571    /// addition to that, segments created after `keep_newer` will be
572    /// preserved. This mitigates a risk of deleting new segments created
573    /// concurrently by another process.
574    ///
575    /// The caller may decide whether to lock the store by `get_head_locked()`.
576    /// It's generally safe to run `gc()` without locking so long as the
577    /// `keep_newer` time is reasonably old, and all writers reload table
578    /// segments by `get_head_locked()` before adding new entries.
579    #[tracing::instrument(skip(self, head))]
580    pub fn gc(&self, head: &Arc<ReadonlyTable>, keep_newer: SystemTime) -> Result<(), PathError> {
581        let read_locked_cache = self.cached_tables.read().unwrap();
582        let reachable_tables: HashSet<&str> = itertools::chain(
583            head.ancestor_segments(),
584            // Also preserve cached segments so these segments can still be
585            // loaded from the disk.
586            read_locked_cache.values(),
587        )
588        .map(|table| table.name())
589        .collect();
590
591        let remove_file_if_not_new = |entry: &fs::DirEntry| -> Result<(), PathError> {
592            let path = entry.path();
593            // Check timestamp, but there's still TOCTOU problem if an existing
594            // file is replaced with new file of the same name.
595            let metadata = entry.metadata().context(&path)?;
596            let mtime = metadata.modified().expect("unsupported platform?");
597            if mtime > keep_newer {
598                tracing::trace!(?path, "not removing");
599                Ok(())
600            } else {
601                tracing::trace!(?path, "removing");
602                fs::remove_file(&path).context(&path)
603            }
604        };
605
606        for entry in self.dir.read_dir().context(&self.dir)? {
607            let entry = entry.context(&self.dir)?;
608            let file_name = entry.file_name();
609            if file_name == "heads" || file_name == "lock" {
610                continue;
611            }
612            let Some(table_name) = file_name
613                .to_str()
614                .filter(|name| name.len() == SEGMENT_FILE_NAME_LENGTH)
615            else {
616                tracing::trace!(?entry, "skipping invalid file name");
617                continue;
618            };
619            if reachable_tables.contains(table_name) {
620                continue;
621            }
622            remove_file_if_not_new(&entry)?;
623        }
624        Ok(())
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use test_case::test_case;
631
632    use super::*;
633    use crate::tests::new_temp_dir;
634
635    #[test_case(false; "memory")]
636    #[test_case(true; "file")]
637    fn stacked_table_empty(on_disk: bool) {
638        let temp_dir = new_temp_dir();
639        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
640        let mut_table = store.get_head().unwrap().start_mutation();
641        let mut _saved_table = None;
642        let table: &dyn TableSegment = if on_disk {
643            _saved_table = Some(store.save_table(mut_table).unwrap());
644            _saved_table.as_ref().unwrap().as_ref()
645        } else {
646            &mut_table
647        };
648
649        // Cannot find any keys
650        assert_eq!(table.get_value(b"\0\0\0"), None);
651        assert_eq!(table.get_value(b"aaa"), None);
652        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
653    }
654
655    #[test_case(false; "memory")]
656    #[test_case(true; "file")]
657    fn stacked_table_single_key(on_disk: bool) {
658        let temp_dir = new_temp_dir();
659        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
660        let mut mut_table = store.get_head().unwrap().start_mutation();
661        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
662        let mut _saved_table = None;
663        let table: &dyn TableSegment = if on_disk {
664            _saved_table = Some(store.save_table(mut_table).unwrap());
665            _saved_table.as_ref().unwrap().as_ref()
666        } else {
667            &mut_table
668        };
669
670        // Can find expected keys
671        assert_eq!(table.get_value(b"\0\0\0"), None);
672        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
673        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
674    }
675
676    #[test_case(false; "memory")]
677    #[test_case(true; "file")]
678    fn stacked_table_multiple_keys(on_disk: bool) {
679        let temp_dir = new_temp_dir();
680        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
681        let mut mut_table = store.get_head().unwrap().start_mutation();
682        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
683        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
684        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
685        let mut _saved_table = None;
686        let table: &dyn TableSegment = if on_disk {
687            _saved_table = Some(store.save_table(mut_table).unwrap());
688            _saved_table.as_ref().unwrap().as_ref()
689        } else {
690            &mut_table
691        };
692
693        // Can find expected keys
694        assert_eq!(table.get_value(b"\0\0\0"), None);
695        assert_eq!(table.get_value(b"abb"), None);
696        assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
697        assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
698        assert_eq!(table.get_value(b"abe"), None);
699        assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
700        assert_eq!(table.get_value(b"\xff\xff\xff"), None);
701    }
702
703    #[test]
704    fn stacked_table_multiple_keys_with_parent_file() {
705        let temp_dir = new_temp_dir();
706        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
707        let mut mut_table = store.get_head().unwrap().start_mutation();
708        mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
709        mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
710        mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
711        for round in 0..10 {
712            for i in 0..10 {
713                mut_table.add_entry(
714                    format!("x{i}{round}").into_bytes(),
715                    format!("value {i}{round}").into_bytes(),
716                );
717            }
718            let saved_table = store.save_table(mut_table).unwrap();
719            mut_table = MutableTable::incremental(saved_table);
720        }
721
722        // Can find expected keys
723        assert_eq!(mut_table.get_value(b"\0\0\0"), None);
724        assert_eq!(mut_table.get_value(b"x.."), None);
725        assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
726        assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
727        assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
728        assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
729        assert_eq!(mut_table.get_value(b"xAA"), None);
730        assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
731    }
732
733    #[test]
734    fn stacked_table_merge() {
735        let temp_dir = new_temp_dir();
736        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
737        let mut mut_base_table = store.get_head().unwrap().start_mutation();
738        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
739        let base_table = store.save_table(mut_base_table).unwrap();
740
741        let mut mut_table1 = MutableTable::incremental(base_table.clone());
742        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
743        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
744        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
745        let table1 = store.save_table(mut_table1).unwrap();
746        let mut mut_table2 = MutableTable::incremental(base_table);
747        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
748        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
749        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
750        mut_table2.merge_in(&table1);
751
752        // Can find expected keys
753        assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
754        assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
755        assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
756        assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
757        // The caller shouldn't write two values for the same key, so it's undefined
758        // which wins, but let's test how it currently behaves.
759        assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
760        assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
761        assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
762        assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
763    }
764
765    #[test]
766    fn stacked_table_automatic_merge() {
767        // Same test as above, but here we let the store do the merging on load
768        let temp_dir = new_temp_dir();
769        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
770        let mut mut_base_table = store.get_head().unwrap().start_mutation();
771        mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
772        let base_table = store.save_table(mut_base_table).unwrap();
773
774        let mut mut_table1 = MutableTable::incremental(base_table.clone());
775        mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
776        mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
777        mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
778        store.save_table(mut_table1).unwrap();
779        let mut mut_table2 = MutableTable::incremental(base_table);
780        mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
781        mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
782        mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
783        let table2 = store.save_table(mut_table2).unwrap();
784
785        // The saved table does not have the keys from table1
786        assert_eq!(table2.get_value(b"abd"), None);
787
788        // Can find expected keys in the merged table we get from get_head()
789        let merged_table = store.get_head().unwrap();
790        assert_eq!(merged_table.get_value(b"\0\0\0"), None);
791        assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
792        assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
793        assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
794        // The caller shouldn't write two values for the same key, so it's undefined
795        // which wins.
796        let value_mmm = merged_table.get_value(b"mmm");
797        assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
798        assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
799        assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
800        assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
801    }
802
803    #[test]
804    fn stacked_table_store_save_empty() {
805        let temp_dir = new_temp_dir();
806        let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
807
808        let mut mut_table = store.get_head().unwrap().start_mutation();
809        mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
810        store.save_table(mut_table).unwrap();
811
812        let mut_table = store.get_head().unwrap().start_mutation();
813        store.save_table(mut_table).unwrap();
814
815        // Table head shouldn't be removed on empty save
816        let table = store.get_head().unwrap();
817        assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
818    }
819}