1use std::cmp::Ordering;
22use std::collections::{BTreeMap, HashMap};
23use std::fs::File;
24use std::io;
25use std::io::{Cursor, Read, Write};
26use std::path::PathBuf;
27use std::sync::{Arc, RwLock};
28
29use blake2::{Blake2b512, Digest};
30use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
31use tempfile::NamedTempFile;
32use thiserror::Error;
33
34use crate::file_util::persist_content_addressed_temp_file;
35use crate::lock::FileLock;
36
37pub trait TableSegment {
38 fn segment_num_entries(&self) -> usize;
39 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>>;
40 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
41 fn segment_add_entries_to(&self, mut_table: &mut MutableTable);
42
43 fn num_entries(&self) -> usize {
44 if let Some(parent_file) = self.segment_parent_file() {
45 parent_file.num_entries() + self.segment_num_entries()
46 } else {
47 self.segment_num_entries()
48 }
49 }
50
51 fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> {
52 if let Some(value) = self.segment_get_value(key) {
53 return Some(value);
54 }
55 let parent_file = self.segment_parent_file()?;
56 let parent_file: &ReadonlyTable = parent_file.as_ref();
57 let parent_file: &'a ReadonlyTable = unsafe { std::mem::transmute(parent_file) };
59 parent_file.get_value(key)
60 }
61}
62
63pub struct ReadonlyTable {
64 key_size: usize,
65 parent_file: Option<Arc<ReadonlyTable>>,
66 name: String,
67 num_local_entries: usize,
69 index: Vec<u8>,
71 values: Vec<u8>,
72}
73
74impl ReadonlyTable {
75 fn load_from(
76 file: &mut dyn Read,
77 store: &TableStore,
78 name: String,
79 key_size: usize,
80 ) -> TableStoreResult<Arc<ReadonlyTable>> {
81 let parent_filename_len = file.read_u32::<LittleEndian>()?;
82 let maybe_parent_file = if parent_filename_len > 0 {
83 let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
84 file.read_exact(&mut parent_filename_bytes)?;
85 let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
86 let parent_file = store.load_table(parent_filename)?;
87 Some(parent_file)
88 } else {
89 None
90 };
91 let num_local_entries = file.read_u32::<LittleEndian>()? as usize;
92 let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size);
93 let mut data = vec![];
94 file.read_to_end(&mut data)?;
95 let values = data.split_off(index_size);
96 let index = data;
97 Ok(Arc::new(ReadonlyTable {
98 key_size,
99 parent_file: maybe_parent_file,
100 name,
101 num_local_entries,
102 index,
103 values,
104 }))
105 }
106
107 pub fn start_mutation(self: &Arc<Self>) -> MutableTable {
108 MutableTable::incremental(self.clone())
109 }
110
111 fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
112 if pos == self.num_local_entries {
113 self.values.len()
114 } else {
115 ReadonlyTableIndexEntry::new(self, pos).value_offset()
116 }
117 }
118
119 fn segment_value_by_pos(&self, pos: usize) -> &[u8] {
120 &self.values
121 [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)]
122 }
123}
124
125impl TableSegment for ReadonlyTable {
126 fn segment_num_entries(&self) -> usize {
127 self.num_local_entries
128 }
129
130 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
131 self.parent_file.as_ref()
132 }
133
134 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
135 let mut low_pos = 0;
136 let mut high_pos = self.num_local_entries;
137 loop {
138 if high_pos == low_pos {
139 return None;
140 }
141 let mid_pos = (low_pos + high_pos) / 2;
142 let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos);
143 match key.cmp(mid_entry.key()) {
144 Ordering::Less => {
145 high_pos = mid_pos;
146 }
147 Ordering::Equal => {
148 return Some(self.segment_value_by_pos(mid_pos));
149 }
150 Ordering::Greater => {
151 low_pos = mid_pos + 1;
152 }
153 }
154 }
155 }
156
157 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
158 for pos in 0..self.num_local_entries {
159 let entry = ReadonlyTableIndexEntry::new(self, pos);
160 mut_table.add_entry(
161 entry.key().to_vec(),
162 self.segment_value_by_pos(pos).to_vec(),
163 );
164 }
165 }
166}
167
168struct ReadonlyTableIndexEntry<'table> {
169 data: &'table [u8],
170}
171
172impl<'table> ReadonlyTableIndexEntry<'table> {
173 fn new(table: &'table ReadonlyTable, pos: usize) -> Self {
174 let entry_size = ReadonlyTableIndexEntry::size(table.key_size);
175 let offset = entry_size * pos;
176 let data = &table.index[offset..offset + entry_size];
177 ReadonlyTableIndexEntry { data }
178 }
179
180 fn size(key_size: usize) -> usize {
181 key_size + 4
182 }
183
184 fn key(&self) -> &'table [u8] {
185 &self.data[0..self.data.len() - 4]
186 }
187
188 fn value_offset(&self) -> usize {
189 (&self.data[self.data.len() - 4..self.data.len()])
190 .read_u32::<LittleEndian>()
191 .unwrap() as usize
192 }
193}
194
195pub struct MutableTable {
196 key_size: usize,
197 parent_file: Option<Arc<ReadonlyTable>>,
198 entries: BTreeMap<Vec<u8>, Vec<u8>>,
199}
200
201impl MutableTable {
202 fn full(key_size: usize) -> Self {
203 Self {
204 key_size,
205 parent_file: None,
206 entries: BTreeMap::new(),
207 }
208 }
209
210 fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
211 let key_size = parent_file.key_size;
212 Self {
213 key_size,
214 parent_file: Some(parent_file),
215 entries: BTreeMap::new(),
216 }
217 }
218
219 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>) {
220 assert_eq!(key.len(), self.key_size);
221 self.entries.insert(key, value);
222 }
223
224 fn add_entries_from(&mut self, other: &dyn TableSegment) {
225 other.segment_add_entries_to(self);
226 }
227
228 fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
229 let mut maybe_own_ancestor = self.parent_file.clone();
230 let mut maybe_other_ancestor = Some(other.clone());
231 let mut files_to_add = vec![];
232 loop {
233 if maybe_other_ancestor.is_none() {
234 break;
235 }
236 let other_ancestor = maybe_other_ancestor.as_ref().unwrap();
237 if maybe_own_ancestor.is_none() {
238 files_to_add.push(other_ancestor.clone());
239 maybe_other_ancestor = other_ancestor.parent_file.clone();
240 continue;
241 }
242 let own_ancestor = maybe_own_ancestor.as_ref().unwrap();
243 if own_ancestor.name == other_ancestor.name {
244 break;
245 }
246 if own_ancestor.num_entries() < other_ancestor.num_entries() {
247 files_to_add.push(other_ancestor.clone());
248 maybe_other_ancestor = other_ancestor.parent_file.clone();
249 } else {
250 maybe_own_ancestor = own_ancestor.parent_file.clone();
251 }
252 }
253
254 for file in files_to_add.iter().rev() {
255 self.add_entries_from(file.as_ref());
256 }
257 }
258
259 fn serialize(self) -> Vec<u8> {
260 let mut buf = vec![];
261
262 if let Some(parent_file) = &self.parent_file {
263 buf.write_u32::<LittleEndian>(parent_file.name.len() as u32)
264 .unwrap();
265 buf.write_all(parent_file.name.as_bytes()).unwrap();
266 } else {
267 buf.write_u32::<LittleEndian>(0).unwrap();
268 }
269
270 buf.write_u32::<LittleEndian>(self.entries.len() as u32)
271 .unwrap();
272
273 let mut value_offset = 0;
274 for (key, value) in &self.entries {
275 buf.write_all(key).unwrap();
276 buf.write_u32::<LittleEndian>(value_offset).unwrap();
277 value_offset += value.len() as u32;
278 }
279 for value in self.entries.values() {
280 buf.write_all(value).unwrap();
281 }
282 buf
283 }
284
285 fn maybe_squash_with_ancestors(self) -> MutableTable {
289 let mut num_new_entries = self.entries.len();
290 let mut files_to_squash = vec![];
291 let mut maybe_parent_file = self.parent_file.clone();
292 let mut squashed;
293 loop {
294 match maybe_parent_file {
295 Some(parent_file) => {
296 if 2 * num_new_entries < parent_file.num_local_entries {
299 squashed = MutableTable::incremental(parent_file);
300 break;
301 }
302 num_new_entries += parent_file.num_local_entries;
303 files_to_squash.push(parent_file.clone());
304 maybe_parent_file = parent_file.parent_file.clone();
305 }
306 None => {
307 squashed = MutableTable::full(self.key_size);
308 break;
309 }
310 }
311 }
312
313 if files_to_squash.is_empty() {
314 return self;
315 }
316
317 for parent_file in files_to_squash.iter().rev() {
318 squashed.add_entries_from(parent_file.as_ref());
319 }
320 squashed.add_entries_from(&self);
321 squashed
322 }
323
324 fn save_in(self, store: &TableStore) -> TableStoreResult<Arc<ReadonlyTable>> {
325 if self.entries.is_empty() && self.parent_file.is_some() {
326 return Ok(self.parent_file.unwrap());
327 }
328
329 let buf = self.maybe_squash_with_ancestors().serialize();
330 let mut hasher = Blake2b512::new();
331 hasher.update(&buf);
332 let file_id_hex = hex::encode(hasher.finalize());
333 let file_path = store.dir.join(&file_id_hex);
334
335 let mut temp_file = NamedTempFile::new_in(&store.dir)?;
336 let file = temp_file.as_file_mut();
337 file.write_all(&buf)?;
338 persist_content_addressed_temp_file(temp_file, file_path)?;
339
340 let mut cursor = Cursor::new(&buf);
341 ReadonlyTable::load_from(&mut cursor, store, file_id_hex, store.key_size)
342 }
343}
344
345impl TableSegment for MutableTable {
346 fn segment_num_entries(&self) -> usize {
347 self.entries.len()
348 }
349
350 fn segment_parent_file(&self) -> Option<&Arc<ReadonlyTable>> {
351 self.parent_file.as_ref()
352 }
353
354 fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> {
355 self.entries.get(key).map(Vec::as_slice)
356 }
357
358 fn segment_add_entries_to(&self, mut_table: &mut MutableTable) {
359 for (key, value) in &self.entries {
360 mut_table.add_entry(key.clone(), value.clone());
361 }
362 }
363}
364
365#[derive(Debug, Error)]
366#[error(transparent)]
367pub enum TableStoreError {
368 IoError(#[from] io::Error),
369 PersistError(#[from] tempfile::PersistError),
370}
371
372pub type TableStoreResult<T> = Result<T, TableStoreError>;
373
374pub struct TableStore {
375 dir: PathBuf,
376 key_size: usize,
377 cached_tables: RwLock<HashMap<String, Arc<ReadonlyTable>>>,
378}
379
380impl TableStore {
381 pub fn init(dir: PathBuf, key_size: usize) -> Self {
382 std::fs::create_dir(dir.join("heads")).unwrap();
383 TableStore {
384 dir,
385 key_size,
386 cached_tables: Default::default(),
387 }
388 }
389
390 pub fn reinit(&self) {
391 std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
392 TableStore::init(self.dir.clone(), self.key_size);
393 }
394
395 pub fn key_size(&self) -> usize {
396 self.key_size
397 }
398
399 pub fn load(dir: PathBuf, key_size: usize) -> Self {
400 TableStore {
401 dir,
402 key_size,
403 cached_tables: Default::default(),
404 }
405 }
406
407 pub fn save_table(&self, mut_table: MutableTable) -> TableStoreResult<Arc<ReadonlyTable>> {
408 let maybe_parent_table = mut_table.parent_file.clone();
409 let table = mut_table.save_in(self)?;
410 self.add_head(&table)?;
411 if let Some(parent_table) = maybe_parent_table {
412 self.remove_head(&parent_table);
413 }
414 {
415 let mut locked_cache = self.cached_tables.write().unwrap();
416 locked_cache.insert(table.name.clone(), table.clone());
417 }
418 Ok(table)
419 }
420
421 fn add_head(&self, table: &Arc<ReadonlyTable>) -> std::io::Result<()> {
422 std::fs::write(self.dir.join("heads").join(&table.name), "")
423 }
424
425 fn remove_head(&self, table: &Arc<ReadonlyTable>) {
426 std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
431 }
432
433 fn lock(&self) -> FileLock {
434 FileLock::lock(self.dir.join("lock"))
435 }
436
437 fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {
438 {
439 let read_locked_cached = self.cached_tables.read().unwrap();
440 if let Some(table) = read_locked_cached.get(&name).cloned() {
441 return Ok(table);
442 }
443 }
444 let table_file_path = self.dir.join(&name);
445 let mut table_file = File::open(table_file_path)?;
446 let table = ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)?;
447 {
448 let mut write_locked_cache = self.cached_tables.write().unwrap();
449 write_locked_cache.insert(table.name.clone(), table.clone());
450 }
451 Ok(table)
452 }
453
454 fn get_head_tables(&self) -> TableStoreResult<Vec<Arc<ReadonlyTable>>> {
455 let mut tables = vec![];
456 for head_entry in std::fs::read_dir(self.dir.join("heads"))? {
457 let head_file_name = head_entry?.file_name();
458 let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
459 tables.push(table);
460 }
461 Ok(tables)
462 }
463
464 pub fn get_head(&self) -> TableStoreResult<Arc<ReadonlyTable>> {
465 let mut tables = self.get_head_tables()?;
466
467 if tables.is_empty() {
468 let empty_table = MutableTable::full(self.key_size);
469 self.save_table(empty_table)
470 } else if tables.len() == 1 {
471 Ok(tables.pop().unwrap())
472 } else {
473 let _lock = self.lock();
481 let mut tables = self.get_head_tables()?;
482
483 if tables.is_empty() {
484 let empty_table = MutableTable::full(self.key_size);
485 return self.save_table(empty_table);
486 }
487
488 if tables.len() == 1 {
489 return Ok(tables.pop().unwrap());
491 }
492
493 let mut merged_table = MutableTable::incremental(tables[0].clone());
494 for other in &tables[1..] {
495 merged_table.merge_in(other);
496 }
497 let merged_table = self.save_table(merged_table)?;
498 for table in &tables[1..] {
499 self.remove_head(table);
500 }
501 Ok(merged_table)
502 }
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use test_case::test_case;
509
510 use super::*;
511
512 #[test_case(false; "memory")]
513 #[test_case(true; "file")]
514 fn stacked_table_empty(on_disk: bool) {
515 let temp_dir = testutils::new_temp_dir();
516 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
517 let mut_table = store.get_head().unwrap().start_mutation();
518 let mut _saved_table = None;
519 let table: &dyn TableSegment = if on_disk {
520 _saved_table = Some(store.save_table(mut_table).unwrap());
521 _saved_table.as_ref().unwrap().as_ref()
522 } else {
523 &mut_table
524 };
525
526 assert_eq!(table.get_value(b"\0\0\0"), None);
528 assert_eq!(table.get_value(b"aaa"), None);
529 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
530 }
531
532 #[test_case(false; "memory")]
533 #[test_case(true; "file")]
534 fn stacked_table_single_key(on_disk: bool) {
535 let temp_dir = testutils::new_temp_dir();
536 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
537 let mut mut_table = store.get_head().unwrap().start_mutation();
538 mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
539 let mut _saved_table = None;
540 let table: &dyn TableSegment = if on_disk {
541 _saved_table = Some(store.save_table(mut_table).unwrap());
542 _saved_table.as_ref().unwrap().as_ref()
543 } else {
544 &mut_table
545 };
546
547 assert_eq!(table.get_value(b"\0\0\0"), None);
549 assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice()));
550 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
551 }
552
553 #[test_case(false; "memory")]
554 #[test_case(true; "file")]
555 fn stacked_table_multiple_keys(on_disk: bool) {
556 let temp_dir = testutils::new_temp_dir();
557 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
558 let mut mut_table = store.get_head().unwrap().start_mutation();
559 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
560 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
561 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
562 let mut _saved_table = None;
563 let table: &dyn TableSegment = if on_disk {
564 _saved_table = Some(store.save_table(mut_table).unwrap());
565 _saved_table.as_ref().unwrap().as_ref()
566 } else {
567 &mut_table
568 };
569
570 assert_eq!(table.get_value(b"\0\0\0"), None);
572 assert_eq!(table.get_value(b"abb"), None);
573 assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice()));
574 assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice()));
575 assert_eq!(table.get_value(b"abe"), None);
576 assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice()));
577 assert_eq!(table.get_value(b"\xff\xff\xff"), None);
578 }
579
580 #[test]
581 fn stacked_table_multiple_keys_with_parent_file() {
582 let temp_dir = testutils::new_temp_dir();
583 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
584 let mut mut_table = store.get_head().unwrap().start_mutation();
585 mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
586 mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
587 mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
588 for round in 0..10 {
589 for i in 0..10 {
590 mut_table.add_entry(
591 format!("x{i}{round}").into_bytes(),
592 format!("value {i}{round}").into_bytes(),
593 );
594 }
595 let saved_table = store.save_table(mut_table).unwrap();
596 mut_table = MutableTable::incremental(saved_table);
597 }
598
599 assert_eq!(mut_table.get_value(b"\0\0\0"), None);
601 assert_eq!(mut_table.get_value(b"x.."), None);
602 assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice()));
603 assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice()));
604 assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice()));
605 assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice()));
606 assert_eq!(mut_table.get_value(b"xAA"), None);
607 assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None);
608 }
609
610 #[test]
611 fn stacked_table_merge() {
612 let temp_dir = testutils::new_temp_dir();
613 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
614 let mut mut_base_table = store.get_head().unwrap().start_mutation();
615 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
616 let base_table = store.save_table(mut_base_table).unwrap();
617
618 let mut mut_table1 = MutableTable::incremental(base_table.clone());
619 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
620 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
621 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
622 let table1 = store.save_table(mut_table1).unwrap();
623 let mut mut_table2 = MutableTable::incremental(base_table);
624 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
625 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
626 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
627 mut_table2.merge_in(&table1);
628
629 assert_eq!(mut_table2.get_value(b"\0\0\0"), None);
631 assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice()));
632 assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice()));
633 assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice()));
634 assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice()));
637 assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice()));
638 assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
639 assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
640 }
641
642 #[test]
643 fn stacked_table_automatic_merge() {
644 let temp_dir = testutils::new_temp_dir();
646 let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
647 let mut mut_base_table = store.get_head().unwrap().start_mutation();
648 mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
649 let base_table = store.save_table(mut_base_table).unwrap();
650
651 let mut mut_table1 = MutableTable::incremental(base_table.clone());
652 mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
653 mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
654 mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
655 store.save_table(mut_table1).unwrap();
656 let mut mut_table2 = MutableTable::incremental(base_table);
657 mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
658 mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
659 mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
660 let table2 = store.save_table(mut_table2).unwrap();
661
662 assert_eq!(table2.get_value(b"abd"), None);
664
665 let merged_table = store.get_head().unwrap();
667 assert_eq!(merged_table.get_value(b"\0\0\0"), None);
668 assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
669 assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
670 assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
671 let value_mmm = merged_table.get_value(b"mmm");
674 assert!(value_mmm == Some(b"side 1".as_slice()) || value_mmm == Some(b"side 2".as_slice()));
675 assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
676 assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
677 assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
678 }
679}