1#![expect(missing_docs)]
24
25use std::cmp::Ordering;
26use std::collections::BTreeMap;
27use std::collections::HashMap;
28use std::collections::HashSet;
29use std::fs;
30use std::fs::File;
31use std::io;
32use std::io::Read;
33use std::io::Write as _;
34use std::iter;
35use std::path::PathBuf;
36use std::sync::Arc;
37use std::sync::RwLock;
38use std::time::SystemTime;
39
40use blake2::Blake2b512;
41use blake2::Digest as _;
42use tempfile::NamedTempFile;
43use thiserror::Error;
44
45use crate::file_util::IoResultExt as _;
46use crate::file_util::PathError;
47use crate::file_util::persist_content_addressed_temp_file;
48use crate::hex_util;
49use crate::lock::FileLock;
50use crate::lock::FileLockError;
51
52const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
54
55pub trait TableSegment {
56 fn segment_num_entries(&self) -> usize;
57 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
58 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
59 fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
60
61 fn num_entries(&self) -> usize {
62 if let Some(parent_file) = self.segment_parent_file() {
63 parent_file.num_entries() + self.segment_num_entries()
64 } else {
65 self.segment_num_entries()
66 }
67 }
68
69 fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
70 self.segment_get_value(key)
71 .or_else(|| self.segment_parent_file()?.get_value(key))
72 }
73}
74
75pub struct ReadonlyTable {
76 key_size: usize,
77 parent_file: Option<Arc<Self>>,
78 name: String,
79 num_local_entries: usize,
81 index: Vec<u8>,
83 values: Vec<u8>,
84}
85
86impl ReadonlyTable {
87 fn load_from(
88 file: &mut dyn Read,
89 store: &TableStore,
90 name: String,
91 key_size: usize,
92 ) -> TableStoreResult<Arc<Self>> {
93 let to_load_err = |err| TableStoreError::LoadSegment {
94 name: name.clone(),
95 err,
96 };
97 let read_u32 = |file: &mut dyn Read| -> TableStoreResult<u32> {
98 let mut buf = [0; 4];
99 file.read_exact(&mut buf).map_err(to_load_err)?;
100 Ok(u32::from_le_bytes(buf))
101 };
102 let parent_filename_len = read_u32(file)?;
103 let maybe_parent_file = if parent_filename_len > 0 {
104 let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
105 file.read_exact(&mut parent_filename_bytes)
106 .map_err(to_load_err)?;
107 let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
108 let parent_file = store.load_table(parent_filename)?;
109 Some(parent_file)
110 } else {
111 None
112 };
113 let num_local_entries = read_u32(file)? as usize;
114 let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
115 let mut data = vec![];
116 file.read_to_end(&mut data).map_err(to_load_err)?;
117 let values = data.split_off(index_size);
118 let index = data;
119 Ok(Arc::new(Self {
120 key_size,
121 parent_file: maybe_parent_file,
122 name,
123 num_local_entries,
124 index,
125 values,
126 }))
127 }
128
129 pub fn name(&self) -> &str {
130 &self.name
131 }
132
133 pub fn ancestor_segments(self: &Arc<Self>) -> impl Iterator<Item = &Arc<Self>> {
135 iter::successors(Some(self), |table| table.segment_parent_file())
136 }
137
138 pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
139 MutableTable::incremental(self.clone())
140 }
141
142 fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
143 if pos == self.num_local_entries {
144 self.values.len()
145 } else {
146 ReadonlyTableIndexEntry::new(self, pos).value_offset()
147 }
148 }
149
150 fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
151 &self.values
152 [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
153 }
154}
155
156impl TableSegment for ReadonlyTable {
157 fn segment_num_entries(&self) -> usize {
158 self.num_local_entries
159 }
160
161 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
162 self.parent_file.as_ref()
163 }
164
165 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
166 let mut low_pos = 0;
167 let mut high_pos = self.num_local_entries;
168 loop {
169 if high_pos == low_pos {
170 return None;
171 }
172 let mid_pos = (low_pos + high_pos) / 2;
173 let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
174 match key.cmp(mid_entry.key()) {
175 Ordering::Less => {
176 high_pos = mid_pos;
177 }
178 Ordering::Equal => {
179 return Some(self.segment_value_by_pos(mid_pos));
180 }
181 Ordering::Greater => {
182 low_pos = mid_pos + 1;
183 }
184 }
185 }
186 }
187
188 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
189 for pos in 0..self.num_local_entries {
190 let entry = ReadonlyTableIndexEntry::new(self, pos);
191 mut_table.add_entry(
192 entry.key().to_vec(),
193 self.segment_value_by_pos(pos).to_vec(),
194 );
195 }
196 }
197}
198
199struct ReadonlyTableIndexEntry<'table> {
200 data: &'table [u8],
201}
202
203impl<'table> ReadonlyTableIndexEntry<'table> {
204 fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
205 let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
206 let offset = entry_size * pos;
207 let data = &table.index[offset..][..entry_size];
208 Self { data }
209 }
210
211 fn size(key_size: usize) -> usize {
212 key_size + 4
213 }
214
215 fn key(&self) -> &'table [u8] {
216 &self.data[0..self.data.len() - 4]
217 }
218
219 fn value_offset(&self) -> usize {
220 u32::from_le_bytes(self.data[self.data.len() - 4..].try_into().unwrap()) as usize
221 }
222}
223
224pub struct MutableTable {
225 key_size: usize,
226 parent_file: Option<Arc<ReadonlyTable>>,
227 entries: BTreeMap<Vec<u8>, Vec<u8>>,
228}
229
230impl MutableTable {
231 fn full(key_size: usize) -> Self {
232 Self {
233 key_size,
234 parent_file: None,
235 entries: BTreeMap::new(),
236 }
237 }
238
239 fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
240 let key_size = parent_file.key_size;
241 Self {
242 key_size,
243 parent_file: Some(parent_file),
244 entries: BTreeMap::new(),
245 }
246 }
247
248 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
249 assert_eq!(key.len(), self.key_size);
250 self.entries.insert(key, value);
251 }
252
253 fn add_entries_from(&mut self, other: &dyn TableSegment) {
254 other.segment_add_entries_to(self);
255 }
256
257 fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
258 let mut maybe_own_ancestor = self.parent_file.clone();
259 let mut maybe_other_ancestor = Some(other.clone());
260 let mut files_to_add = vec![];
261 loop {
262 if maybe_other_ancestor.is_none() {
263 break;
264 }
265 let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
266 if maybe_own_ancestor.is_none() {
267 files_to_add.push(other_ancestor.clone());
268 maybe_other_ancestor = other_ancestor.parent_file.clone();
269 continue;
270 }
271 let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
272 if own_ancestor.name == other_ancestor.name {
273 break;
274 }
275 if own_ancestor.num_entries() < other_ancestor.num_entries() {
276 files_to_add.push(other_ancestor.clone());
277 maybe_other_ancestor = other_ancestor.parent_file.clone();
278 } else {
279 maybe_own_ancestor = own_ancestor.parent_file.clone();
280 }
281 }
282
283 for file in files_to_add.iter().rev() {
284 self.add_entries_from(file.as_ref());
285 }
286 }
287
288 fn serialize(self) -> Vec<u8> {
289 let mut buf = vec![];
290
291 if let Some(parent_file) = &self.parent_file {
292 buf.extend(u32::try_from(parent_file.name.len()).unwrap().to_le_bytes());
293 buf.extend_from_slice(parent_file.name.as_bytes());
294 } else {
295 buf.extend(0_u32.to_le_bytes());
296 }
297
298 buf.extend(u32::try_from(self.entries.len()).unwrap().to_le_bytes());
299
300 let mut value_offset = 0_u32;
301 for (key, value) in &self.entries {
302 buf.extend_from_slice(key);
303 buf.extend(value_offset.to_le_bytes());
304 value_offset += u32::try_from(value.len()).unwrap();
305 }
306 for value in self.entries.values() {
307 buf.extend_from_slice(value);
308 }
309 buf
310 }
311
312 #[expect(clippy::assigning_clones)]
316 fn maybe_squash_with_ancestors(self) -> Self {
317 let mut num_new_entries = self.entries.len();
318 let mut files_to_squash = vec![];
319 let mut maybe_parent_file = self.parent_file.clone();
320 let mut squashed;
321 loop {
322 match maybe_parent_file {
323 Some(parent_file) => {
324 if 2 * num_new_entries < parent_file.num_local_entries {
327 squashed = Self::incremental(parent_file);
328 break;
329 }
330 num_new_entries += parent_file.num_local_entries;
331 files_to_squash.push(parent_file.clone());
332 maybe_parent_file = parent_file.parent_file.clone();
333 }
334 None => {
335 squashed = Self::full(self.key_size);
336 break;
337 }
338 }
339 }
340
341 if files_to_squash.is_empty() {
342 return self;
343 }
344
345 for parent_file in files_to_squash.iter().rev() {
346 squashed.add_entries_from(parent_file.as_ref());
347 }
348 squashed.add_entries_from(&self);
349 squashed
350 }
351
352 fn save_in(mut self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
353 if self.entries.is_empty()
354 && let Some(parent_file) = self.parent_file.take()
355 {
356 return Ok(parent_file);
357 }
358
359 let buf = self.maybe_squash_with_ancestors().serialize();
360 let mut hasher = Blake2b512::new();
361 hasher.update(&buf);
362 let file_id_hex = hex_util::encode_hex(&hasher.finalize());
363 let file_path = store.dir.join(&file_id_hex);
364 let to_save_err = |err| TableStoreError::SaveSegment {
365 name: file_id_hex.clone(),
366 err,
367 };
368
369 let mut temp_file = NamedTempFile::new_in(&store.dir).map_err(to_save_err)?;
370 let file = temp_file.as_file_mut();
371 file.write_all(&buf).map_err(to_save_err)?;
372 persist_content_addressed_temp_file(temp_file, file_path).map_err(to_save_err)?;
373
374 ReadonlyTable::load_from(&mut buf.as_slice(), store, file_id_hex, store.key_size)
375 }
376}
377
378impl TableSegment for MutableTable {
379 fn segment_num_entries(&self) -> usize {
380 self.entries.len()
381 }
382
383 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
384 self.parent_file.as_ref()
385 }
386
387 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
388 self.entries.get(key).map(Vec::as_slice)
389 }
390
391 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
392 for (key, value) in &self.entries {
393 mut_table.add_entry(key.clone(), value.clone());
394 }
395 }
396}
397
398#[derive(Debug, Error)]
399pub enum TableStoreError {
400 #[error("Failed to load table heads")]
401 LoadHeads(#[source] io::Error),
402 #[error("Failed to save table heads")]
403 SaveHeads(#[source] io::Error),
404 #[error("Failed to load table segment '{name}'")]
405 LoadSegment {
406 name: String,
407 #[source]
408 err: io::Error,
409 },
410 #[error("Failed to save table segment '{name}'")]
411 SaveSegment {
412 name: String,
413 #[source]
414 err: io::Error,
415 },
416 #[error("Failed to lock table store")]
417 Lock(#[source] FileLockError),
418}
419
420pub type TableStoreResult<T> = Result<T, TableStoreError>;
421
422pub struct TableStore {
423 dir: PathBuf,
424 key_size: usize,
425 cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
426}
427
428impl TableStore {
429 pub fn init(dir: PathBuf, key_size: usize) -> Self {
430 std::fs::create_dir(dir.join("heads")).unwrap();
431 Self {
432 dir,
433 key_size,
434 cached_tables: Default::default(),
435 }
436 }
437
438 pub fn reinit(&self) {
439 std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
440 Self::init(self.dir.clone(), self.key_size);
441 }
442
443 pub fn key_size(&self) -> usize {
444 self.key_size
445 }
446
447 pub fn load(dir: PathBuf, key_size: usize) -> Self {
448 Self {
449 dir,
450 key_size,
451 cached_tables: Default::default(),
452 }
453 }
454
455 pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
456 let maybe_parent_table = mut_table.parent_file.clone();
457 let table = mut_table.save_in(self)?;
458 self.add_head(&table)?;
459 if let Some(parent_table) = maybe_parent_table
460 && parent_table.name != table.name
461 {
462 self.remove_head(&parent_table);
463 }
464 {
465 let mut locked_cache = self.cached_tables.write().unwrap();
466 locked_cache.insert(table.name.clone(), table.clone());
467 }
468 Ok(table)
469 }
470
471 fn add_head(&self, table: &Arc<ReadonlyTable>) -> TableStoreResult<()> {
472 std::fs::write(self.dir.join("heads").join(&table.name), "")
473 .map_err(TableStoreError::SaveHeads)
474 }
475
476 fn remove_head(&self, table: &Arc<ReadonlyTable>) {
477 std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
482 }
483
484 fn lock(&self) -> TableStoreResult<FileLock> {
485 FileLock::lock(self.dir.join("lock")).map_err(TableStoreError::Lock)
486 }
487
488 fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
489 {
490 let read_locked_cached = self.cached_tables.read().unwrap();
491 if let Some(table) = read_locked_cached.get(&name).cloned() {
492 return Ok(table);
493 }
494 }
495 let to_load_err = |err| TableStoreError::LoadSegment {
496 name: name.clone(),
497 err,
498 };
499 let table_file_path = self.dir.join(&name);
500 let mut table_file = File::open(table_file_path).map_err(to_load_err)?;
501 let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
502 {
503 let mut write_locked_cache = self.cached_tables.write().unwrap();
504 write_locked_cache.insert(table.name.clone(), table.clone());
505 }
506 Ok(table)
507 }
508
509 fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
510 let mut tables = vec![];
511 for head_entry in
512 std::fs::read_dir(self.dir.join("heads")).map_err(TableStoreError::LoadHeads)?
513 {
514 let head_file_name = head_entry.map_err(TableStoreError::LoadHeads)?.file_name();
515 let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
516 tables.push(table);
517 }
518 Ok(tables)
519 }
520
521 pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
522 let mut tables = self.get_head_tables()?;
523
524 if tables.is_empty() {
525 let empty_table = MutableTable::full(self.key_size);
526 self.save_table(empty_table)
527 } else if tables.len() == 1 {
528 Ok(tables.pop().unwrap())
529 } else {
530 let (table, _) = self.get_head_locked()?;
538 Ok(table)
539 }
540 }
541
542 pub fn get_head_locked(&self) -> TableStoreResult<(Arc<ReadonlyTable>, FileLock)> {
543 let lock = self.lock()?;
544 let mut tables = self.get_head_tables()?;
545
546 if tables.is_empty() {
547 let empty_table = MutableTable::full(self.key_size);
548 let table = self.save_table(empty_table)?;
549 return Ok((table, lock));
550 }
551
552 if tables.len() == 1 {
553 return Ok((tables.pop().unwrap(), lock));
555 }
556
557 let mut merged_table = MutableTable::incremental(tables[0].clone());
558 for other in &tables[1..] {
559 merged_table.merge_in(other);
560 }
561 let merged_table = self.save_table(merged_table)?;
562 for table in &tables[1..] {
563 self.remove_head(table);
564 }
565 Ok((merged_table, lock))
566 }
567
568 #[tracing::instrument(skip(self, head))]
580 pub fn gc(&self, head: &Arc<ReadonlyTable>, keep_newer: SystemTime) -> Result<(), PathError> {
581 let read_locked_cache = self.cached_tables.read().unwrap();
582 let reachable_tables: HashSet<&str> = itertools::chain(
583 head.ancestor_segments(),
584 read_locked_cache.values(),
587 )
588 .map(|table| table.name())
589 .collect();
590
591 let remove_file_if_not_new = |entry: &fs::DirEntry| -> Result<(), PathError> {
592 let path = entry.path();
593 let metadata = entry.metadata().context(&path)?;
596 let mtime = metadata.modified().expect("unsupported platform?");
597 if mtime > keep_newer {
598 tracing::trace!(?path, "not removing");
599 Ok(())
600 } else {
601 tracing::trace!(?path, "removing");
602 fs::remove_file(&path).context(&path)
603 }
604 };
605
606 for entry in self.dir.read_dir().context(&self.dir)? {
607 let entry = entry.context(&self.dir)?;
608 let file_name = entry.file_name();
609 if file_name == "heads" || file_name == "lock" {
610 continue;
611 }
612 let Some(table_name) = file_name
613 .to_str()
614 .filter(|name| name.len() == SEGMENT_FILE_NAME_LENGTH)
615 else {
616 tracing::trace!(?entry, "skipping invalid file name");
617 continue;
618 };
619 if reachable_tables.contains(table_name) {
620 continue;
621 }
622 remove_file_if_not_new(&entry)?;
623 }
624 Ok(())
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use test_case::test_case;
631
632 use super::*;
633 use crate::tests::new_temp_dir;
634
635 #[test_case(false; "memory")]
636 #[test_case(true; "file")]
637 fn stacked_table_empty(on_disk: bool) {
638 let temp_dir = new_temp_dir();
639 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
640 let mut_table = store.get_head().unwrap().start_mutation();
641 let mut _saved_table = None;
642 let table: &dyn TableSegment = if on_disk {
643 _saved_table = Some(store.save_table(mut_table).unwrap());
644 _saved_table.as_ref().unwrap().as_ref()
645 } else {
646 &mut_table
647 };
648
649 assert_eq!(table.get_value(b"\0\0\0"), None);
651 assert_eq!(table.get_value(b"aaa"), None);
652 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
653 }
654
655 #[test_case(false; "memory")]
656 #[test_case(true; "file")]
657 fn stacked_table_single_key(on_disk: bool) {
658 let temp_dir = new_temp_dir();
659 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
660 let mut mut_table = store.get_head().unwrap().start_mutation();
661 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
662 let mut _saved_table = None;
663 let table: &dyn TableSegment = if on_disk {
664 _saved_table = Some(store.save_table(mut_table).unwrap());
665 _saved_table.as_ref().unwrap().as_ref()
666 } else {
667 &mut_table
668 };
669
670 assert_eq!(table.get_value(b"\0\0\0"), None);
672 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
673 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
674 }
675
676 #[test_case(false; "memory")]
677 #[test_case(true; "file")]
678 fn stacked_table_multiple_keys(on_disk: bool) {
679 let temp_dir = new_temp_dir();
680 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
681 let mut mut_table = store.get_head().unwrap().start_mutation();
682 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
683 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
684 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
685 let mut _saved_table = None;
686 let table: &dyn TableSegment = if on_disk {
687 _saved_table = Some(store.save_table(mut_table).unwrap());
688 _saved_table.as_ref().unwrap().as_ref()
689 } else {
690 &mut_table
691 };
692
693 assert_eq!(table.get_value(b"\0\0\0"), None);
695 assert_eq!(table.get_value(b"abb"), None);
696 assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
697 assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
698 assert_eq!(table.get_value(b"abe"), None);
699 assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
700 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
701 }
702
703 #[test]
704 fn stacked_table_multiple_keys_with_parent_file() {
705 let temp_dir = new_temp_dir();
706 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
707 let mut mut_table = store.get_head().unwrap().start_mutation();
708 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
709 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
710 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
711 for round in 0..10 {
712 for i in 0..10 {
713 mut_table.add_entry(
714 format!("x{i}{round}").into_bytes(),
715 format!("value {i}{round}").into_bytes(),
716 );
717 }
718 let saved_table = store.save_table(mut_table).unwrap();
719 mut_table = MutableTable::incremental(saved_table);
720 }
721
722 assert_eq!(mut_table.get_value(b"\0\0\0"), None);
724 assert_eq!(mut_table.get_value(b"x.."), None);
725 assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
726 assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
727 assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
728 assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
729 assert_eq!(mut_table.get_value(b"xAA"), None);
730 assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
731 }
732
733 #[test]
734 fn stacked_table_merge() {
735 let temp_dir = new_temp_dir();
736 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
737 let mut mut_base_table = store.get_head().unwrap().start_mutation();
738 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
739 let base_table = store.save_table(mut_base_table).unwrap();
740
741 let mut mut_table1 = MutableTable::incremental(base_table.clone());
742 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
743 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
744 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
745 let table1 = store.save_table(mut_table1).unwrap();
746 let mut mut_table2 = MutableTable::incremental(base_table);
747 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
748 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
749 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
750 mut_table2.merge_in(&table1);
751
752 assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
754 assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
755 assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
756 assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
757 assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
760 assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
761 assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
762 assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
763 }
764
765 #[test]
766 fn stacked_table_automatic_merge() {
767 let temp_dir = new_temp_dir();
769 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
770 let mut mut_base_table = store.get_head().unwrap().start_mutation();
771 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
772 let base_table = store.save_table(mut_base_table).unwrap();
773
774 let mut mut_table1 = MutableTable::incremental(base_table.clone());
775 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
776 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
777 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
778 store.save_table(mut_table1).unwrap();
779 let mut mut_table2 = MutableTable::incremental(base_table);
780 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
781 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
782 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
783 let table2 = store.save_table(mut_table2).unwrap();
784
785 assert_eq!(table2.get_value(b"abd"), None);
787
788 let merged_table = store.get_head().unwrap();
790 assert_eq!(merged_table.get_value(b"\0\0\0"), None);
791 assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
792 assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
793 assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
794 let value_mmm = merged_table.get_value(b"mmm");
797 assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
798 assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
799 assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
800 assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
801 }
802
803 #[test]
804 fn stacked_table_store_save_empty() {
805 let temp_dir = new_temp_dir();
806 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
807
808 let mut mut_table = store.get_head().unwrap().start_mutation();
809 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
810 store.save_table(mut_table).unwrap();
811
812 let mut_table = store.get_head().unwrap().start_mutation();
813 store.save_table(mut_table).unwrap();
814
815 let table = store.get_head().unwrap();
817 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
818 }
819}