1#![allow(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::fs::File;
29use std::io;
30use std::io::Read;
31use std::io::Write;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::sync::RwLock;
35
36use blake2::Blake2b512;
37use blake2::Digest;
38use tempfile::NamedTempFile;
39use thiserror::Error;
40
41use crate::file_util::persist_content_addressed_temp_file;
42use crate::lock::FileLock;
43use crate::lock::FileLockError;
44
45pub trait TableSegment {
46 fn segment_num_entries(&self) -> usize;
47 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
48 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
49 fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
50
51 fn num_entries(&self) -> usize {
52 if let Some(parent_file) = self.segment_parent_file() {
53 parent_file.num_entries() + self.segment_num_entries()
54 } else {
55 self.segment_num_entries()
56 }
57 }
58
59 fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
60 self.segment_get_value(key)
61 .or_else(|| self.segment_parent_file()?.get_value(key))
62 }
63}
64
65pub struct ReadonlyTable {
66 key_size: usize,
67 parent_file: Option<Arc<ReadonlyTable>>,
68 name: String,
69 num_local_entries: usize,
71 index: Vec<u8>,
73 values: Vec<u8>,
74}
75
76impl ReadonlyTable {
77 fn load_from(
78 file: &mut dyn Read,
79 store: &TableStore,
80 name: String,
81 key_size: usize,
82 ) -> TableStoreResult<Arc<ReadonlyTable>> {
83 let to_load_err = |err| TableStoreError::LoadSegment {
84 name: name.clone(),
85 err,
86 };
87 let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
88 let mut buf = [0; 4];
89 file.read_exact(&mut buf).map_err(to_load_err)?;
90 Ok(u32::from_le_bytes(buf))
91 };
92 let parent_filename_len = read_u32(file)?;
93 let maybe_parent_file = if parent_filename_len > 0 {
94 let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
95 file.read_exact(&mut parent_filename_bytes)
96 .map_err(to_load_err)?;
97 let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
98 let parent_file = store.load_table(parent_filename)?;
99 Some(parent_file)
100 } else {
101 None
102 };
103 let num_local_entries = read_u32(file)? as usize;
104 let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
105 let mut data = vec![];
106 file.read_to_end(&mut data).map_err(to_load_err)?;
107 let values = data.split_off(index_size);
108 let index = data;
109 Ok(Arc::new(ReadonlyTable {
110 key_size,
111 parent_file: maybe_parent_file,
112 name,
113 num_local_entries,
114 index,
115 values,
116 }))
117 }
118
119 pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
120 MutableTable::incremental(self.clone())
121 }
122
123 fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
124 if pos == self.num_local_entries {
125 self.values.len()
126 } else {
127 ReadonlyTableIndexEntry::new(self, pos).value_offset()
128 }
129 }
130
131 fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
132 &self.values
133 [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
134 }
135}
136
137impl TableSegment for ReadonlyTable {
138 fn segment_num_entries(&self) -> usize {
139 self.num_local_entries
140 }
141
142 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
143 self.parent_file.as_ref()
144 }
145
146 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
147 let mut low_pos = 0;
148 let mut high_pos = self.num_local_entries;
149 loop {
150 if high_pos == low_pos {
151 return None;
152 }
153 let mid_pos = (low_pos + high_pos) / 2;
154 let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
155 match key.cmp(mid_entry.key()) {
156 Ordering::Less => {
157 high_pos = mid_pos;
158 }
159 Ordering::Equal => {
160 return Some(self.segment_value_by_pos(mid_pos));
161 }
162 Ordering::Greater => {
163 low_pos = mid_pos + 1;
164 }
165 }
166 }
167 }
168
169 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
170 for pos in 0..self.num_local_entries {
171 let entry = ReadonlyTableIndexEntry::new(self, pos);
172 mut_table.add_entry(
173 entry.key().to_vec(),
174 self.segment_value_by_pos(pos).to_vec(),
175 );
176 }
177 }
178}
179
180struct ReadonlyTableIndexEntry<'table> {
181 data: &'table [u8],
182}
183
184impl<'table> ReadonlyTableIndexEntry<'table> {
185 fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
186 let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
187 let offset = entry_size * pos;
188 let data = &table.index[offset..][..entry_size];
189 ReadonlyTableIndexEntry { data }
190 }
191
192 fn size(key_size: usize) -> usize {
193 key_size + 4
194 }
195
196 fn key(&self) -> &'table [u8] {
197 &self.data[0..self.data.len() - 4]
198 }
199
200 fn value_offset(&self) -> usize {
201 u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
202 }
203}
204
205pub struct MutableTable {
206 key_size: usize,
207 parent_file: Option<Arc<ReadonlyTable>>,
208 entries: BTreeMap<Vec<u8>, Vec<u8>>,
209}
210
211impl MutableTable {
212 fn full(key_size: usize) -> Self {
213 Self {
214 key_size,
215 parent_file: None,
216 entries: BTreeMap::new(),
217 }
218 }
219
220 fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
221 let key_size = parent_file.key_size;
222 Self {
223 key_size,
224 parent_file: Some(parent_file),
225 entries: BTreeMap::new(),
226 }
227 }
228
229 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
230 assert_eq!(key.len(), self.key_size);
231 self.entries.insert(key, value);
232 }
233
234 fn add_entries_from(&mut self, other: &dyn TableSegment) {
235 other.segment_add_entries_to(self);
236 }
237
238 #[allow(unknown_lints)] #[allow(clippy::assigning_clones)]
240 fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
241 let mut maybe_own_ancestor = self.parent_file.clone();
242 let mut maybe_other_ancestor = Some(other.clone());
243 let mut files_to_add = vec![];
244 loop {
245 if maybe_other_ancestor.is_none() {
246 break;
247 }
248 let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
249 if maybe_own_ancestor.is_none() {
250 files_to_add.push(other_ancestor.clone());
251 maybe_other_ancestor = other_ancestor.parent_file.clone();
252 continue;
253 }
254 let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
255 if own_ancestor.name == other_ancestor.name {
256 break;
257 }
258 if own_ancestor.num_entries() < other_ancestor.num_entries() {
259 files_to_add.push(other_ancestor.clone());
260 maybe_other_ancestor = other_ancestor.parent_file.clone();
261 } else {
262 maybe_own_ancestor = own_ancestor.parent_file.clone();
263 }
264 }
265
266 for file in files_to_add.iter().rev() {
267 self.add_entries_from(file.as_ref());
268 }
269 }
270
271 fn serialize(self) -> Vec<u8> {
272 let mut buf = vec![];
273
274 if let Some(parent_file) = &self.parent_file {
275 buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
276 buf.extend_from_slice(parent_file.name.as_bytes());
277 } else {
278 buf.extend(0_u32.to_le_bytes());
279 }
280
281 buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
282
283 let mut value_offset = 0_u32;
284 for (key, value) in &self.entries {
285 buf.extend_from_slice(key);
286 buf.extend(value_offset.to_le_bytes());
287 value_offset += u32::try_from(value.len()).unwrap();
288 }
289 for value in self.entries.values() {
290 buf.extend_from_slice(value);
291 }
292 buf
293 }
294
295 #[allow(unknown_lints)] #[allow(clippy::assigning_clones)]
300 fn maybe_squash_with_ancestors(self) -> MutableTable {
301 let mut num_new_entries = self.entries.len();
302 let mut files_to_squash = vec![];
303 let mut maybe_parent_file = self.parent_file.clone();
304 let mut squashed;
305 loop {
306 match maybe_parent_file {
307 Some(parent_file) => {
308 if 2 * num_new_entries < parent_file.num_local_entries {
311 squashed = MutableTable::incremental(parent_file);
312 break;
313 }
314 num_new_entries += parent_file.num_local_entries;
315 files_to_squash.push(parent_file.clone());
316 maybe_parent_file = parent_file.parent_file.clone();
317 }
318 None => {
319 squashed = MutableTable::full(self.key_size);
320 break;
321 }
322 }
323 }
324
325 if files_to_squash.is_empty() {
326 return self;
327 }
328
329 for parent_file in files_to_squash.iter().rev() {
330 squashed.add_entries_from(parent_file.as_ref());
331 }
332 squashed.add_entries_from(&self);
333 squashed
334 }
335
336 fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
337 if self.entries.is_empty() && self.parent_file.is_some() {
338 return Ok(self.parent_file.unwrap());
339 }
340
341 let buf = self.maybe_squash_with_ancestors().serialize();
342 let mut hasher = Blake2b512::new();
343 hasher.update(&buf);
344 let file_id_hex = hex::encode(hasher.finalize());
345 let file_path = store.dir.join(&file_id_hex);
346 let to_save_err = |err| TableStoreError::SaveSegment {
347 name: file_id_hex.clone(),
348 err,
349 };
350
351 let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
352 let file = temp_file.as_file_mut();
353 file.write_all(&buf).map_err(to_save_err)?;
354 persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
355
356 ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
357 }
358}
359
360impl TableSegment for MutableTable {
361 fn segment_num_entries(&self) -> usize {
362 self.entries.len()
363 }
364
365 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
366 self.parent_file.as_ref()
367 }
368
369 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
370 self.entries.get(key).map(Vec::as_slice)
371 }
372
373 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
374 for (key, value) in &self.entries {
375 mut_table.add_entry(key.clone(), value.clone());
376 }
377 }
378}
379
380#[derive(Debug, Error)]
381pub enum TableStoreError {
382 #[error("Failed to load table heads")]
383 LoadHeads(#[source] io::Error),
384 #[error("Failed to save table heads")]
385 SaveHeads(#[source] io::Error),
386 #[error("Failed to load table segment '{name}'")]
387 LoadSegment {
388 name: String,
389 #[source]
390 err: io::Error,
391 },
392 #[error("Failed to save table segment '{name}'")]
393 SaveSegment {
394 name: String,
395 #[source]
396 err: io::Error,
397 },
398 #[error("Failed to lock table store")]
399 Lock(#[source] FileLockError),
400}
401
402pub type TableStoreResult<T> = Result<T, TableStoreError>;
403
404pub struct TableStore {
405 dir: PathBuf,
406 key_size: usize,
407 cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
408}
409
410impl TableStore {
411 pub fn init(dir: PathBuf, key_size: usize) -> Self {
412 std::fs::create_dir(dir.join("heads")).unwrap();
413 TableStore {
414 dir,
415 key_size,
416 cached_tables: Default::default(),
417 }
418 }
419
420 pub fn reinit(&self) {
421 std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
422 TableStore::init(self.dir.clone(), self.key_size);
423 }
424
425 pub fn key_size(&self) -> usize {
426 self.key_size
427 }
428
429 pub fn load(dir: PathBuf, key_size: usize) -> Self {
430 TableStore {
431 dir,
432 key_size,
433 cached_tables: Default::default(),
434 }
435 }
436
437 pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
438 let maybe_parent_table = mut_table.parent_file.clone();
439 let table = mut_table.save_in(self)?;
440 self.add_head(&table)?;
441 if let Some(parent_table) = maybe_parent_table {
442 if parent_table.name != table.name {
443 self.remove_head(&parent_table);
444 }
445 }
446 {
447 let mut locked_cache = self.cached_tables.write().unwrap();
448 locked_cache.insert(table.name.clone(), table.clone());
449 }
450 Ok(table)
451 }
452
453 fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
454 std::fs::write(self.dir.join("heads").join(&table.name), "")
455 .map_err(TableStoreError::SaveHeads)
456 }
457
458 fn remove_head(&self, table: &Arc<ReadonlyTable>) {
459 std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
464 }
465
466 fn lock(&self) -> TableStoreResult<FileLock> {
467 FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
468 }
469
470 fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
471 {
472 let read_locked_cached = self.cached_tables.read().unwrap();
473 if let Some(table) = read_locked_cached.get(&name).cloned() {
474 return Ok(table);
475 }
476 }
477 let to_load_err = |err| TableStoreError::LoadSegment {
478 name: name.clone(),
479 err,
480 };
481 let table_file_path = self.dir.join(&name);
482 let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
483 let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
484 {
485 let mut write_locked_cache = self.cached_tables.write().unwrap();
486 write_locked_cache.insert(table.name.clone(), table.clone());
487 }
488 Ok(table)
489 }
490
491 fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
492 let mut tables = vec![];
493 for head_entry in
494 std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
495 {
496 let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
497 let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
498 tables.push(table);
499 }
500 Ok(tables)
501 }
502
503 pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
504 let mut tables = self.get_head_tables()?;
505
506 if tables.is_empty() {
507 let empty_table = MutableTable::full(self.key_size);
508 self.save_table(empty_table)
509 } else if tables.len() == 1 {
510 Ok(tables.pop().unwrap())
511 } else {
512 let (table, _) = self.get_head_locked()?;
520 Ok(table)
521 }
522 }
523
524 pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
525 let lock = self.lock()?;
526 let mut tables = self.get_head_tables()?;
527
528 if tables.is_empty() {
529 let empty_table = MutableTable::full(self.key_size);
530 let table = self.save_table(empty_table)?;
531 return Ok((table, lock));
532 }
533
534 if tables.len() == 1 {
535 return Ok((tables.pop().unwrap(), lock));
537 }
538
539 let mut merged_table = MutableTable::incremental(tables[0].clone());
540 for other in &tables[1..] {
541 merged_table.merge_in(other);
542 }
543 let merged_table = self.save_table(merged_table)?;
544 for table in &tables[1..] {
545 self.remove_head(table);
546 }
547 Ok((merged_table, lock))
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use test_case::test_case;
554
555 use super::*;
556 use crate::tests::new_temp_dir;
557
558 #[test_case(false; "memory")]
559 #[test_case(true; "file")]
560 fn stacked_table_empty(on_disk: bool) {
561 let temp_dir = new_temp_dir();
562 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
563 let mut_table = store.get_head().unwrap().start_mutation();
564 let mut _saved_table = None;
565 let table: &dyn TableSegment = if on_disk {
566 _saved_table = Some(store.save_table(mut_table).unwrap());
567 _saved_table.as_ref().unwrap().as_ref()
568 } else {
569 &mut_table
570 };
571
572 assert_eq!(table.get_value(b"\0\0\0"), None);
574 assert_eq!(table.get_value(b"aaa"), None);
575 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
576 }
577
578 #[test_case(false; "memory")]
579 #[test_case(true; "file")]
580 fn stacked_table_single_key(on_disk: bool) {
581 let temp_dir = new_temp_dir();
582 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
583 let mut mut_table = store.get_head().unwrap().start_mutation();
584 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
585 let mut _saved_table = None;
586 let table: &dyn TableSegment = if on_disk {
587 _saved_table = Some(store.save_table(mut_table).unwrap());
588 _saved_table.as_ref().unwrap().as_ref()
589 } else {
590 &mut_table
591 };
592
593 assert_eq!(table.get_value(b"\0\0\0"), None);
595 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
596 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
597 }
598
599 #[test_case(false; "memory")]
600 #[test_case(true; "file")]
601 fn stacked_table_multiple_keys(on_disk: bool) {
602 let temp_dir = new_temp_dir();
603 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
604 let mut mut_table = store.get_head().unwrap().start_mutation();
605 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
606 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
607 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
608 let mut _saved_table = None;
609 let table: &dyn TableSegment = if on_disk {
610 _saved_table = Some(store.save_table(mut_table).unwrap());
611 _saved_table.as_ref().unwrap().as_ref()
612 } else {
613 &mut_table
614 };
615
616 assert_eq!(table.get_value(b"\0\0\0"), None);
618 assert_eq!(table.get_value(b"abb"), None);
619 assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
620 assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
621 assert_eq!(table.get_value(b"abe"), None);
622 assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
623 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
624 }
625
626 #[test]
627 fn stacked_table_multiple_keys_with_parent_file() {
628 let temp_dir = new_temp_dir();
629 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
630 let mut mut_table = store.get_head().unwrap().start_mutation();
631 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
632 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
633 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
634 for round in 0..10 {
635 for i in 0..10 {
636 mut_table.add_entry(
637 format!("x{i}{round}").into_bytes(),
638 format!("value {i}{round}").into_bytes(),
639 );
640 }
641 let saved_table = store.save_table(mut_table).unwrap();
642 mut_table = MutableTable::incremental(saved_table);
643 }
644
645 assert_eq!(mut_table.get_value(b"\0\0\0"), None);
647 assert_eq!(mut_table.get_value(b"x.."), None);
648 assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
649 assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
650 assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
651 assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
652 assert_eq!(mut_table.get_value(b"xAA"), None);
653 assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
654 }
655
656 #[test]
657 fn stacked_table_merge() {
658 let temp_dir = new_temp_dir();
659 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
660 let mut mut_base_table = store.get_head().unwrap().start_mutation();
661 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
662 let base_table = store.save_table(mut_base_table).unwrap();
663
664 let mut mut_table1 = MutableTable::incremental(base_table.clone());
665 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
666 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
667 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
668 let table1 = store.save_table(mut_table1).unwrap();
669 let mut mut_table2 = MutableTable::incremental(base_table);
670 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
671 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
672 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
673 mut_table2.merge_in(&table1);
674
675 assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
677 assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
678 assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
679 assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
680 assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
683 assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
684 assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
685 assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
686 }
687
688 #[test]
689 fn stacked_table_automatic_merge() {
690 let temp_dir = new_temp_dir();
692 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
693 let mut mut_base_table = store.get_head().unwrap().start_mutation();
694 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
695 let base_table = store.save_table(mut_base_table).unwrap();
696
697 let mut mut_table1 = MutableTable::incremental(base_table.clone());
698 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
699 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
700 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
701 store.save_table(mut_table1).unwrap();
702 let mut mut_table2 = MutableTable::incremental(base_table);
703 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
704 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
705 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
706 let table2 = store.save_table(mut_table2).unwrap();
707
708 assert_eq!(table2.get_value(b"abd"), None);
710
711 let merged_table = store.get_head().unwrap();
713 assert_eq!(merged_table.get_value(b"\0\0\0"), None);
714 assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
715 assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
716 assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
717 let value_mmm = merged_table.get_value(b"mmm");
720 assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
721 assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
722 assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
723 assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
724 }
725
726 #[test]
727 fn stacked_table_store_save_empty() {
728 let temp_dir = new_temp_dir();
729 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
730
731 let mut mut_table = store.get_head().unwrap().start_mutation();
732 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
733 store.save_table(mut_table).unwrap();
734
735 let mut_table = store.get_head().unwrap().start_mutation();
736 store.save_table(mut_table).unwrap();
737
738 let table = store.get_head().unwrap();
740 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
741 }
742}