1#![allow(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::fs::File;
29use std::io;
30use std::io::Read;
31use std::io::Write as _;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::sync::RwLock;
35
36use blake2::Blake2b512;
37use blake2::Digest as _;
38use tempfile::NamedTempFile;
39use thiserror::Error;
40
41use crate::file_util::persist_content_addressed_temp_file;
42use crate::hex_util;
43use crate::lock::FileLock;
44use crate::lock::FileLockError;
45
46pub trait TableSegment {
47 fn segment_num_entries(&self) -> usize;
48 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
49 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
50 fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
51
52 fn num_entries(&self) -> usize {
53 if let Some(parent_file) = self.segment_parent_file() {
54 parent_file.num_entries() + self.segment_num_entries()
55 } else {
56 self.segment_num_entries()
57 }
58 }
59
60 fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
61 self.segment_get_value(key)
62 .or_else(|| self.segment_parent_file()?.get_value(key))
63 }
64}
65
66pub struct ReadonlyTable {
67 key_size: usize,
68 parent_file: Option<Arc<ReadonlyTable>>,
69 name: String,
70 num_local_entries: usize,
72 index: Vec<u8>,
74 values: Vec<u8>,
75}
76
77impl ReadonlyTable {
78 fn load_from(
79 file: &mut dyn Read,
80 store: &TableStore,
81 name: String,
82 key_size: usize,
83 ) -> TableStoreResult<Arc<ReadonlyTable>> {
84 let to_load_err = |err| TableStoreError::LoadSegment {
85 name: name.clone(),
86 err,
87 };
88 let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
89 let mut buf = [0; 4];
90 file.read_exact(&mut buf).map_err(to_load_err)?;
91 Ok(u32::from_le_bytes(buf))
92 };
93 let parent_filename_len = read_u32(file)?;
94 let maybe_parent_file = if parent_filename_len > 0 {
95 let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
96 file.read_exact(&mut parent_filename_bytes)
97 .map_err(to_load_err)?;
98 let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
99 let parent_file = store.load_table(parent_filename)?;
100 Some(parent_file)
101 } else {
102 None
103 };
104 let num_local_entries = read_u32(file)? as usize;
105 let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
106 let mut data = vec![];
107 file.read_to_end(&mut data).map_err(to_load_err)?;
108 let values = data.split_off(index_size);
109 let index = data;
110 Ok(Arc::new(ReadonlyTable {
111 key_size,
112 parent_file: maybe_parent_file,
113 name,
114 num_local_entries,
115 index,
116 values,
117 }))
118 }
119
120 pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
121 MutableTable::incremental(self.clone())
122 }
123
124 fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
125 if pos == self.num_local_entries {
126 self.values.len()
127 } else {
128 ReadonlyTableIndexEntry::new(self, pos).value_offset()
129 }
130 }
131
132 fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
133 &self.values
134 [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
135 }
136}
137
138impl TableSegment for ReadonlyTable {
139 fn segment_num_entries(&self) -> usize {
140 self.num_local_entries
141 }
142
143 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
144 self.parent_file.as_ref()
145 }
146
147 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
148 let mut low_pos = 0;
149 let mut high_pos = self.num_local_entries;
150 loop {
151 if high_pos == low_pos {
152 return None;
153 }
154 let mid_pos = (low_pos + high_pos) / 2;
155 let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
156 match key.cmp(mid_entry.key()) {
157 Ordering::Less => {
158 high_pos = mid_pos;
159 }
160 Ordering::Equal => {
161 return Some(self.segment_value_by_pos(mid_pos));
162 }
163 Ordering::Greater => {
164 low_pos = mid_pos + 1;
165 }
166 }
167 }
168 }
169
170 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
171 for pos in 0..self.num_local_entries {
172 let entry = ReadonlyTableIndexEntry::new(self, pos);
173 mut_table.add_entry(
174 entry.key().to_vec(),
175 self.segment_value_by_pos(pos).to_vec(),
176 );
177 }
178 }
179}
180
181struct ReadonlyTableIndexEntry<'table> {
182 data: &'table [u8],
183}
184
185impl<'table> ReadonlyTableIndexEntry<'table> {
186 fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
187 let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
188 let offset = entry_size * pos;
189 let data = &table.index[offset..][..entry_size];
190 ReadonlyTableIndexEntry { data }
191 }
192
193 fn size(key_size: usize) -> usize {
194 key_size + 4
195 }
196
197 fn key(&self) -> &'table [u8] {
198 &self.data[0..self.data.len() - 4]
199 }
200
201 fn value_offset(&self) -> usize {
202 u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
203 }
204}
205
206pub struct MutableTable {
207 key_size: usize,
208 parent_file: Option<Arc<ReadonlyTable>>,
209 entries: BTreeMap<Vec<u8>, Vec<u8>>,
210}
211
212impl MutableTable {
213 fn full(key_size: usize) -> Self {
214 Self {
215 key_size,
216 parent_file: None,
217 entries: BTreeMap::new(),
218 }
219 }
220
221 fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
222 let key_size = parent_file.key_size;
223 Self {
224 key_size,
225 parent_file: Some(parent_file),
226 entries: BTreeMap::new(),
227 }
228 }
229
230 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
231 assert_eq!(key.len(), self.key_size);
232 self.entries.insert(key, value);
233 }
234
235 fn add_entries_from(&mut self, other: &dyn TableSegment) {
236 other.segment_add_entries_to(self);
237 }
238
239 fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
240 let mut maybe_own_ancestor = self.parent_file.clone();
241 let mut maybe_other_ancestor = Some(other.clone());
242 let mut files_to_add = vec![];
243 loop {
244 if maybe_other_ancestor.is_none() {
245 break;
246 }
247 let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
248 if maybe_own_ancestor.is_none() {
249 files_to_add.push(other_ancestor.clone());
250 maybe_other_ancestor = other_ancestor.parent_file.clone();
251 continue;
252 }
253 let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
254 if own_ancestor.name == other_ancestor.name {
255 break;
256 }
257 if own_ancestor.num_entries() < other_ancestor.num_entries() {
258 files_to_add.push(other_ancestor.clone());
259 maybe_other_ancestor = other_ancestor.parent_file.clone();
260 } else {
261 maybe_own_ancestor = own_ancestor.parent_file.clone();
262 }
263 }
264
265 for file in files_to_add.iter().rev() {
266 self.add_entries_from(file.as_ref());
267 }
268 }
269
270 fn serialize(self) -> Vec<u8> {
271 let mut buf = vec![];
272
273 if let Some(parent_file) = &self.parent_file {
274 buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
275 buf.extend_from_slice(parent_file.name.as_bytes());
276 } else {
277 buf.extend(0_u32.to_le_bytes());
278 }
279
280 buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
281
282 let mut value_offset = 0_u32;
283 for (key, value) in &self.entries {
284 buf.extend_from_slice(key);
285 buf.extend(value_offset.to_le_bytes());
286 value_offset += u32::try_from(value.len()).unwrap();
287 }
288 for value in self.entries.values() {
289 buf.extend_from_slice(value);
290 }
291 buf
292 }
293
294 #[expect(clippy::assigning_clones)]
298 fn maybe_squash_with_ancestors(self) -> MutableTable {
299 let mut num_new_entries = self.entries.len();
300 let mut files_to_squash = vec![];
301 let mut maybe_parent_file = self.parent_file.clone();
302 let mut squashed;
303 loop {
304 match maybe_parent_file {
305 Some(parent_file) => {
306 if 2 * num_new_entries < parent_file.num_local_entries {
309 squashed = MutableTable::incremental(parent_file);
310 break;
311 }
312 num_new_entries += parent_file.num_local_entries;
313 files_to_squash.push(parent_file.clone());
314 maybe_parent_file = parent_file.parent_file.clone();
315 }
316 None => {
317 squashed = MutableTable::full(self.key_size);
318 break;
319 }
320 }
321 }
322
323 if files_to_squash.is_empty() {
324 return self;
325 }
326
327 for parent_file in files_to_squash.iter().rev() {
328 squashed.add_entries_from(parent_file.as_ref());
329 }
330 squashed.add_entries_from(&self);
331 squashed
332 }
333
334 fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
335 if self.entries.is_empty() && self.parent_file.is_some() {
336 return Ok(self.parent_file.unwrap());
337 }
338
339 let buf = self.maybe_squash_with_ancestors().serialize();
340 let mut hasher = Blake2b512::new();
341 hasher.update(&buf);
342 let file_id_hex = hex_util::encode_hex(&hasher.finalize());
343 let file_path = store.dir.join(&file_id_hex);
344 let to_save_err = |err| TableStoreError::SaveSegment {
345 name: file_id_hex.clone(),
346 err,
347 };
348
349 let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
350 let file = temp_file.as_file_mut();
351 file.write_all(&buf).map_err(to_save_err)?;
352 persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
353
354 ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
355 }
356}
357
358impl TableSegment for MutableTable {
359 fn segment_num_entries(&self) -> usize {
360 self.entries.len()
361 }
362
363 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
364 self.parent_file.as_ref()
365 }
366
367 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
368 self.entries.get(key).map(Vec::as_slice)
369 }
370
371 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
372 for (key, value) in &self.entries {
373 mut_table.add_entry(key.clone(), value.clone());
374 }
375 }
376}
377
378#[derive(Debug, Error)]
379pub enum TableStoreError {
380 #[error("Failed to load table heads")]
381 LoadHeads(#[source] io::Error),
382 #[error("Failed to save table heads")]
383 SaveHeads(#[source] io::Error),
384 #[error("Failed to load table segment '{name}'")]
385 LoadSegment {
386 name: String,
387 #[source]
388 err: io::Error,
389 },
390 #[error("Failed to save table segment '{name}'")]
391 SaveSegment {
392 name: String,
393 #[source]
394 err: io::Error,
395 },
396 #[error("Failed to lock table store")]
397 Lock(#[source] FileLockError),
398}
399
400pub type TableStoreResult<T> = Result<T, TableStoreError>;
401
402pub struct TableStore {
403 dir: PathBuf,
404 key_size: usize,
405 cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
406}
407
408impl TableStore {
409 pub fn init(dir: PathBuf, key_size: usize) -> Self {
410 std::fs::create_dir(dir.join("heads")).unwrap();
411 TableStore {
412 dir,
413 key_size,
414 cached_tables: Default::default(),
415 }
416 }
417
418 pub fn reinit(&self) {
419 std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
420 TableStore::init(self.dir.clone(), self.key_size);
421 }
422
423 pub fn key_size(&self) -> usize {
424 self.key_size
425 }
426
427 pub fn load(dir: PathBuf, key_size: usize) -> Self {
428 TableStore {
429 dir,
430 key_size,
431 cached_tables: Default::default(),
432 }
433 }
434
435 pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
436 let maybe_parent_table = mut_table.parent_file.clone();
437 let table = mut_table.save_in(self)?;
438 self.add_head(&table)?;
439 if let Some(parent_table) = maybe_parent_table {
440 if parent_table.name != table.name {
441 self.remove_head(&parent_table);
442 }
443 }
444 {
445 let mut locked_cache = self.cached_tables.write().unwrap();
446 locked_cache.insert(table.name.clone(), table.clone());
447 }
448 Ok(table)
449 }
450
451 fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
452 std::fs::write(self.dir.join("heads").join(&table.name), "")
453 .map_err(TableStoreError::SaveHeads)
454 }
455
456 fn remove_head(&self, table: &Arc<ReadonlyTable>) {
457 std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
462 }
463
464 fn lock(&self) -> TableStoreResult<FileLock> {
465 FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
466 }
467
468 fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
469 {
470 let read_locked_cached = self.cached_tables.read().unwrap();
471 if let Some(table) = read_locked_cached.get(&name).cloned() {
472 return Ok(table);
473 }
474 }
475 let to_load_err = |err| TableStoreError::LoadSegment {
476 name: name.clone(),
477 err,
478 };
479 let table_file_path = self.dir.join(&name);
480 let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
481 let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
482 {
483 let mut write_locked_cache = self.cached_tables.write().unwrap();
484 write_locked_cache.insert(table.name.clone(), table.clone());
485 }
486 Ok(table)
487 }
488
489 fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
490 let mut tables = vec![];
491 for head_entry in
492 std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
493 {
494 let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
495 let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
496 tables.push(table);
497 }
498 Ok(tables)
499 }
500
501 pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
502 let mut tables = self.get_head_tables()?;
503
504 if tables.is_empty() {
505 let empty_table = MutableTable::full(self.key_size);
506 self.save_table(empty_table)
507 } else if tables.len() == 1 {
508 Ok(tables.pop().unwrap())
509 } else {
510 let (table, _) = self.get_head_locked()?;
518 Ok(table)
519 }
520 }
521
522 pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
523 let lock = self.lock()?;
524 let mut tables = self.get_head_tables()?;
525
526 if tables.is_empty() {
527 let empty_table = MutableTable::full(self.key_size);
528 let table = self.save_table(empty_table)?;
529 return Ok((table, lock));
530 }
531
532 if tables.len() == 1 {
533 return Ok((tables.pop().unwrap(), lock));
535 }
536
537 let mut merged_table = MutableTable::incremental(tables[0].clone());
538 for other in &tables[1..] {
539 merged_table.merge_in(other);
540 }
541 let merged_table = self.save_table(merged_table)?;
542 for table in &tables[1..] {
543 self.remove_head(table);
544 }
545 Ok((merged_table, lock))
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use test_case::test_case;
552
553 use super::*;
554 use crate::tests::new_temp_dir;
555
556 #[test_case(false; "memory")]
557 #[test_case(true; "file")]
558 fn stacked_table_empty(on_disk: bool) {
559 let temp_dir = new_temp_dir();
560 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
561 let mut_table = store.get_head().unwrap().start_mutation();
562 let mut _saved_table = None;
563 let table: &dyn TableSegment = if on_disk {
564 _saved_table = Some(store.save_table(mut_table).unwrap());
565 _saved_table.as_ref().unwrap().as_ref()
566 } else {
567 &mut_table
568 };
569
570 assert_eq!(table.get_value(b"\0\0\0"), None);
572 assert_eq!(table.get_value(b"aaa"), None);
573 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
574 }
575
576 #[test_case(false; "memory")]
577 #[test_case(true; "file")]
578 fn stacked_table_single_key(on_disk: bool) {
579 let temp_dir = new_temp_dir();
580 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
581 let mut mut_table = store.get_head().unwrap().start_mutation();
582 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
583 let mut _saved_table = None;
584 let table: &dyn TableSegment = if on_disk {
585 _saved_table = Some(store.save_table(mut_table).unwrap());
586 _saved_table.as_ref().unwrap().as_ref()
587 } else {
588 &mut_table
589 };
590
591 assert_eq!(table.get_value(b"\0\0\0"), None);
593 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
594 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
595 }
596
597 #[test_case(false; "memory")]
598 #[test_case(true; "file")]
599 fn stacked_table_multiple_keys(on_disk: bool) {
600 let temp_dir = new_temp_dir();
601 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
602 let mut mut_table = store.get_head().unwrap().start_mutation();
603 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
604 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
605 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
606 let mut _saved_table = None;
607 let table: &dyn TableSegment = if on_disk {
608 _saved_table = Some(store.save_table(mut_table).unwrap());
609 _saved_table.as_ref().unwrap().as_ref()
610 } else {
611 &mut_table
612 };
613
614 assert_eq!(table.get_value(b"\0\0\0"), None);
616 assert_eq!(table.get_value(b"abb"), None);
617 assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
618 assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
619 assert_eq!(table.get_value(b"abe"), None);
620 assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
621 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
622 }
623
624 #[test]
625 fn stacked_table_multiple_keys_with_parent_file() {
626 let temp_dir = new_temp_dir();
627 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
628 let mut mut_table = store.get_head().unwrap().start_mutation();
629 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
630 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
631 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
632 for round in 0..10 {
633 for i in 0..10 {
634 mut_table.add_entry(
635 format!("x{i}{round}").into_bytes(),
636 format!("value {i}{round}").into_bytes(),
637 );
638 }
639 let saved_table = store.save_table(mut_table).unwrap();
640 mut_table = MutableTable::incremental(saved_table);
641 }
642
643 assert_eq!(mut_table.get_value(b"\0\0\0"), None);
645 assert_eq!(mut_table.get_value(b"x.."), None);
646 assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
647 assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
648 assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
649 assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
650 assert_eq!(mut_table.get_value(b"xAA"), None);
651 assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
652 }
653
654 #[test]
655 fn stacked_table_merge() {
656 let temp_dir = new_temp_dir();
657 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
658 let mut mut_base_table = store.get_head().unwrap().start_mutation();
659 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
660 let base_table = store.save_table(mut_base_table).unwrap();
661
662 let mut mut_table1 = MutableTable::incremental(base_table.clone());
663 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
664 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
665 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
666 let table1 = store.save_table(mut_table1).unwrap();
667 let mut mut_table2 = MutableTable::incremental(base_table);
668 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
669 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
670 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
671 mut_table2.merge_in(&table1);
672
673 assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
675 assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
676 assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
677 assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
678 assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
681 assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
682 assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
683 assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
684 }
685
686 #[test]
687 fn stacked_table_automatic_merge() {
688 let temp_dir = new_temp_dir();
690 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
691 let mut mut_base_table = store.get_head().unwrap().start_mutation();
692 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
693 let base_table = store.save_table(mut_base_table).unwrap();
694
695 let mut mut_table1 = MutableTable::incremental(base_table.clone());
696 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
697 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
698 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
699 store.save_table(mut_table1).unwrap();
700 let mut mut_table2 = MutableTable::incremental(base_table);
701 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
702 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
703 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
704 let table2 = store.save_table(mut_table2).unwrap();
705
706 assert_eq!(table2.get_value(b"abd"), None);
708
709 let merged_table = store.get_head().unwrap();
711 assert_eq!(merged_table.get_value(b"\0\0\0"), None);
712 assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
713 assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
714 assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
715 let value_mmm = merged_table.get_value(b"mmm");
718 assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
719 assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
720 assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
721 assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
722 }
723
724 #[test]
725 fn stacked_table_store_save_empty() {
726 let temp_dir = new_temp_dir();
727 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
728
729 let mut mut_table = store.get_head().unwrap().start_mutation();
730 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
731 store.save_table(mut_table).unwrap();
732
733 let mut_table = store.get_head().unwrap().start_mutation();
734 store.save_table(mut_table).unwrap();
735
736 let table = store.get_head().unwrap();
738 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
739 }
740}