1use bincode::config::Options;
2use itertools::Itertools;
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use sstable::{SSIterator, Table, TableBuilder, TableIterator};
6use transient_btree_index::{BtreeConfig, BtreeIndex};
7
8use crate::serializer::KeyVec;
9use crate::{errors::Result, serializer::KeySerializer};
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::iter::{FusedIterator, Peekable};
13use std::marker::PhantomData;
14use std::ops::{Bound, RangeBounds};
15use std::path::Path;
16
17const KB: usize = 1 << 10;
18pub const MB: usize = KB * KB;
19const BLOCK_MAX_SIZE: usize = 4 * KB;
20
21pub const DEFAULT_BLOCK_CACHE_CAPACITY: usize = 8 * MB;
23
24#[derive(Serialize, Deserialize)]
25struct Entry<K, V>
26where
27 K: Ord,
28{
29 key: K,
30 value: V,
31}
32
33pub enum EvictionStrategy {
34 MaximumItems(usize),
35}
36
37impl Default for EvictionStrategy {
38 fn default() -> Self {
39 EvictionStrategy::MaximumItems(10_000)
40 }
41}
42
43pub struct DiskMap<K, V>
44where
45 K: 'static + KeySerializer + Serialize + DeserializeOwned + Clone + Send + Sync + Ord,
46 for<'de> V: 'static + Serialize + Deserialize<'de> + Clone + Send + Sync,
47{
48 eviction_strategy: EvictionStrategy,
49 block_cache_capacity: usize,
50 c0: BTreeMap<K, Option<V>>,
51 c1: Option<BtreeIndex<K, Option<V>>>,
52 c2: Option<Table>,
53 serialization: bincode::config::DefaultOptions,
54
55 c1_btree_config: BtreeConfig,
56}
57
58fn custom_options(block_cache_capacity: usize) -> sstable::Options {
59 let blocks = (block_cache_capacity / BLOCK_MAX_SIZE).max(1);
60 sstable::Options::default().with_cache_capacity(blocks)
61}
62
63impl<K, V> DiskMap<K, V>
64where
65 K: 'static + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync + Ord,
66 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
67{
68 pub fn new(
69 persisted_file: Option<&Path>,
70 eviction_strategy: EvictionStrategy,
71 block_cache_capacity: usize,
72 c1_config: BtreeConfig,
73 ) -> Result<DiskMap<K, V>> {
74 let mut disk_table = None;
75
76 if let Some(persisted_file) = persisted_file
77 && persisted_file.is_file()
78 {
79 let table = Table::new_from_file(custom_options(block_cache_capacity), persisted_file)?;
81 disk_table = Some(table);
82 }
83
84 Ok(DiskMap {
85 eviction_strategy,
86 block_cache_capacity,
87 c0: BTreeMap::default(),
88 c2: disk_table,
89 serialization: bincode::options(),
90 c1: None,
91 c1_btree_config: c1_config,
92 })
93 }
94
95 pub fn new_temporary(
96 eviction_strategy: EvictionStrategy,
97 block_cache_capacity: usize,
98 c1_config: BtreeConfig,
99 ) -> DiskMap<K, V> {
100 DiskMap {
101 eviction_strategy,
102 block_cache_capacity,
103 c0: BTreeMap::default(),
104 c2: None,
105 serialization: bincode::options(),
106 c1: None,
107 c1_btree_config: c1_config,
108 }
109 }
110
111 pub fn insert(&mut self, key: K, value: V) -> Result<()> {
112 self.c0.insert(key, Some(value));
113
114 self.evict_c0_if_necessary()?;
115
116 Ok(())
117 }
118
119 pub fn get(&self, key: &K) -> Result<Option<Cow<'_, V>>> {
120 if let Some(entry) = self.c0.get(key) {
122 if let Some(value) = entry {
123 return Ok(Some(Cow::Borrowed(value)));
124 } else {
125 return Ok(None);
128 }
129 }
130 if let Some(c1) = &self.c1
132 && let Some(entry) = c1.get(key)?
133 {
134 if let Some(value) = entry {
135 return Ok(Some(Cow::Owned(value)));
136 } else {
137 return Ok(None);
140 }
141 }
142
143 if let Some(c2) = &self.c2 {
145 let key = K::create_key(key);
146 if let Some(value) = c2.get(&key)? {
147 let value: Option<V> = self.serialization.deserialize(&value)?;
148 if let Some(value) = value {
149 return Ok(Some(Cow::Owned(value)));
150 } else {
151 return Ok(None);
153 }
154 }
155 }
156
157 Ok(None)
158 }
159
160 pub fn contains_key(&self, key: &K) -> Result<bool> {
161 if let Some(value) = self.c0.get(key) {
163 if value.is_some() {
164 return Ok(true);
165 } else {
166 return Ok(false);
168 }
169 }
170
171 if let Some(c1) = &self.c1
173 && c1.contains_key(key)?
174 {
175 return Ok(true);
176 }
177
178 if let Some(c2) = &self.c2 {
182 let mut table_it = c2.iter();
183 let key = K::create_key(key);
184 table_it.seek(&key);
185 if let Some(it_key) = table_it.current_key()
186 && it_key == key.as_ref()
187 {
188 return Ok(true);
189 }
190 }
191 Ok(false)
192 }
193
194 pub fn remove(&mut self, key: &K) -> Result<Option<V>> {
195 let existing = self.get(key)?.map(|existing| existing.into_owned());
196 if existing.is_some() {
197 self.c0.insert(key.clone(), None);
199
200 self.evict_c0_if_necessary()?;
201 }
202
203 Ok(existing)
204 }
205
206 pub fn iter(&self) -> Result<ResultIterator<'_, K, V>> {
207 if let Some(c1) = &self.c1 {
208 if self.c0.is_empty() && self.c2.is_none() {
209 let it = c1
211 .range(..)?
212 .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
213 .map(|entry| entry.map_err(|e| e.into()));
214
215 return Ok(Box::new(it));
216 }
217 } else if let Some(c2) = &self.c2 {
218 if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
219 let table_iterator = c2.iter();
220 let it = SingleTableIterator {
221 table_iterator,
222 serialization: self.serialization,
223 phantom: PhantomData,
224 };
225 return Ok(Box::new(it));
226 }
227 } else {
228 let it = self
230 .c0
231 .iter()
232 .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
233 return Ok(Box::new(it));
234 }
235
236 Ok(Box::new(self.range(..)))
238 }
239
240 pub fn range<'a, R>(&'a self, range: R) -> Box<dyn Iterator<Item = Result<(K, V)>> + 'a>
242 where
243 R: RangeBounds<K> + Clone,
244 {
245 if let Some(c1) = &self.c1 {
247 if self.c0.is_empty() && self.c2.is_none() {
248 let c1_range = match c1.range(range).map_err(|e| e.into()) {
249 Ok(c1_range) => c1_range,
250 Err(e) => return Box::new(std::iter::once(Err(e))),
251 };
252 let it = c1_range
254 .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
255 .map(|entry| entry.map_err(|e| e.into()));
256 return Box::new(it);
257 }
258 } else if let Some(c2) = &self.c2 {
259 if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
260 let mapped_start_bound: std::ops::Bound<KeyVec> = match range.start_bound() {
261 Bound::Included(end) => Bound::Included(K::create_key(end)),
262 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
263 Bound::Unbounded => Bound::Unbounded,
264 };
265
266 let mapped_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
267 Bound::Included(end) => Bound::Included(K::create_key(end)),
268 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
269 Bound::Unbounded => Bound::Unbounded,
270 };
271
272 return Box::new(SimplifiedRange::new(
274 mapped_start_bound,
275 mapped_end_bound,
276 c2,
277 self.serialization,
278 ));
279 }
280 } else {
281 let it = self
284 .c0
285 .range(range)
286 .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
287 return Box::new(it);
288 }
289 match CombinedRange::new(
291 range,
292 &self.c0,
293 self.c1.as_ref(),
294 self.c2.as_ref(),
295 self.serialization,
296 ) {
297 Ok(result) => Box::new(result),
298 Err(e) => Box::new(std::iter::once(Err(e))),
299 }
300 }
301
302 pub fn is_empty(&self) -> Result<bool> {
303 if self.c0.is_empty() && self.c1.is_none() && self.c2.is_none() {
304 return Ok(true);
305 }
306 let mut it = self.iter()?;
307 Ok(it.next().is_none())
308 }
309
310 pub fn clear(&mut self) {
311 self.c0.clear();
312 self.c1 = None;
313 self.c2 = None;
314 }
315
316 pub fn write_to(&self, location: &Path) -> Result<()> {
317 if let Some(parent) = location.parent() {
319 std::fs::create_dir_all(parent)?;
320 }
321 let out_file = std::fs::OpenOptions::new()
323 .write(true)
324 .read(true)
325 .create(true)
326 .truncate(true)
327 .open(location)?;
328 let mut builder = TableBuilder::new(custom_options(self.block_cache_capacity), out_file);
329 for entry in self.iter()? {
330 let (key, value) = entry?;
331 let key = key.create_key();
332 let value = Some(value);
333 builder.add(&key, &self.serialization.serialize(&value)?)?;
334 }
335 builder.finish()?;
336
337 Ok(())
338 }
339
340 pub fn compact(&mut self) -> Result<()> {
342 debug!("Evicting C0 and merging it with existing C1 to a temporary file");
343
344 if self.c1.is_none() {
345 let c1 = BtreeIndex::with_capacity(self.c1_btree_config.clone(), self.c0.len())?;
346 self.c1 = Some(c1);
347 }
348
349 if let Some(c1) = self.c1.as_mut() {
350 let mut c0 = BTreeMap::new();
351 std::mem::swap(&mut self.c0, &mut c0);
352 for (k, v) in c0.into_iter() {
353 c1.insert(k, v)?;
354 }
355 }
356
357 debug!("Finished evicting C0");
358 Ok(())
359 }
360
361 fn evict_c0_if_necessary(&mut self) -> Result<()> {
362 let evict_c0 = match self.eviction_strategy {
363 EvictionStrategy::MaximumItems(n) => self.c0.len() >= n,
364 };
365
366 if evict_c0 {
367 self.compact()?;
368 }
369
370 Ok(())
371 }
372}
373
374struct SingleTableIterator<K, V> {
376 table_iterator: TableIterator,
377 serialization: bincode::config::DefaultOptions,
378 phantom: std::marker::PhantomData<(K, V)>,
379}
380
381impl<K, V> Iterator for SingleTableIterator<K, V>
382where
383 for<'de> K: 'static + Clone + KeySerializer + Send,
384 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
385{
386 type Item = Result<(K, V)>;
387
388 fn next(&mut self) -> Option<Self::Item> {
389 while let Some((key, value)) = self.table_iterator.next() {
390 let key = match K::parse_key(&key) {
391 Ok(key) => key,
392 Err(e) => return Some(Err(e.into())),
393 };
394 let value: Option<V> = match self.serialization.deserialize(&value) {
395 Ok(value) => value,
396 Err(e) => return Some(Err(e.into())),
397 };
398 if let Some(value) = value {
399 return Some(Ok((key, value)));
400 }
401 }
402 None
403 }
404}
405
406impl<K, V> FusedIterator for SingleTableIterator<K, V>
407where
408 K: 'static + Clone + KeySerializer + Send,
409 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
410{
411}
412
413type ResultIterator<'a, K, V> = Box<dyn Iterator<Item = Result<(K, V)>> + 'a>;
414
415pub struct CombinedRange<'a, K, V>
416where
417 for<'de> K: 'static + Clone + KeySerializer + Send,
418 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
419{
420 c0_iterator: Peekable<std::collections::btree_map::Range<'a, K, Option<V>>>,
421 c1_iterator: Peekable<ResultIterator<'a, K, Option<V>>>,
422 c2_iterator: Peekable<ResultIterator<'a, K, V>>,
423}
424
425impl<'a, K, V> CombinedRange<'a, K, V>
426where
427 for<'de> K: 'static + Clone + KeySerializer + Serialize + Deserialize<'de> + Send + Sync + Ord,
428 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
429{
430 fn new<R: RangeBounds<K> + Clone>(
431 range: R,
432 c0: &'a BTreeMap<K, Option<V>>,
433 c1: Option<&'a BtreeIndex<K, Option<V>>>,
434 c2: Option<&Table>,
435 serialization: bincode::config::DefaultOptions,
436 ) -> Result<CombinedRange<'a, K, V>> {
437 let c1_iterator: Box<dyn Iterator<Item = Result<(K, Option<V>)>>> = if let Some(c1) = c1 {
438 let it = c1
439 .range(range.clone())?
440 .map(|entry| entry.map_err(|e| e.into()));
441 Box::new(it)
442 } else {
443 Box::new(std::iter::empty())
444 };
445
446 let c2: Box<dyn Iterator<Item = Result<(K, V)>>> = if let Some(c2) = c2 {
447 let table_start_bound = match range.start_bound() {
448 Bound::Included(end) => Bound::Included(K::create_key(end)),
449 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
450 Bound::Unbounded => Bound::Unbounded,
451 };
452
453 let table_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
454 Bound::Included(end) => Bound::Included(K::create_key(end)),
455 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
456 Bound::Unbounded => Bound::Unbounded,
457 };
458
459 let it = SimplifiedRange::new(table_start_bound, table_end_bound, c2, serialization);
460 Box::new(it)
461 } else {
462 Box::new(std::iter::empty())
463 };
464
465 Ok(CombinedRange {
466 c0_iterator: c0.range(range).peekable(),
467 c1_iterator: c1_iterator.peekable(),
468 c2_iterator: c2.peekable(),
469 })
470 }
471}
472
473impl<K, V> Iterator for CombinedRange<'_, K, V>
474where
475 K: Ord,
476 for<'de> K: 'static + Clone + KeySerializer + Send,
477 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
478{
479 type Item = Result<(K, V)>;
480
481 fn next(&mut self) -> Option<Self::Item> {
482 while self.c0_iterator.peek().is_some()
483 || self.c1_iterator.peek().is_some()
484 || self.c2_iterator.peek().is_some()
485 {
486 let c0 = self.c0_iterator.peek().map(|(k, _v)| Some(*k));
488 let c1 = self.c1_iterator.peek().map(|entry| match entry {
489 Ok((k, _v)) => Some(k),
490 Err(_) => None,
491 });
492 let c3 = self.c2_iterator.peek().map(|entry| match entry {
493 Ok((k, _v)) => Some(k),
494 Err(_) => None,
495 });
496
497 let min_key = vec![c0, c1, c3].into_iter().flatten().min();
498 if let Some(min_key) = min_key {
499 let c0_is_min = c0 == Some(min_key);
500 let c1_is_min = c1 == Some(min_key);
501 let c3_is_min = c3 == Some(min_key);
502
503 let c0 = if c0_is_min {
505 self.c0_iterator.next()
506 } else {
507 None
508 };
509 let c1 = if c1_is_min {
510 self.c1_iterator.next()
511 } else {
512 None
513 };
514 let c3 = if c3_is_min {
515 self.c2_iterator.next()
516 } else {
517 None
518 };
519
520 if let Some((k, v)) = c0 {
522 if let Some(v) = v {
523 return Some(Ok((k.clone(), v.clone())));
524 } else {
525 continue;
527 }
528 } else if let Some(entry) = c1 {
529 match entry {
530 Ok((k, v)) => {
531 if let Some(v) = v {
532 return Some(Ok((k, v)));
533 } else {
534 continue;
536 }
537 }
538 Err(e) => {
539 return Some(Err(e));
540 }
541 };
542 } else if let Some(entry) = c3 {
543 match entry {
544 Ok((k, v)) => {
545 return Some(Ok((k, v)));
546 }
547 Err(e) => {
548 return Some(Err(e));
549 }
550 }
551 }
552 }
553 }
554 None
555 }
556}
557
558impl<K, V> FusedIterator for CombinedRange<'_, K, V>
559where
560 K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
561 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
562{
563}
564
565impl<K, V> Default for DiskMap<K, V>
566where
567 K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync,
568 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
569{
570 fn default() -> Self {
571 DiskMap::new_temporary(
572 EvictionStrategy::default(),
573 DEFAULT_BLOCK_CACHE_CAPACITY,
574 BtreeConfig::default(),
575 )
576 }
577}
578
579struct SimplifiedRange<K, V> {
581 range_start: Bound<KeyVec>,
582 range_end: Bound<KeyVec>,
583 table_it: TableIterator,
584 exhausted: bool,
585 serialization: bincode::config::DefaultOptions,
586
587 current_key: Vec<u8>,
588 current_value: Vec<u8>,
589
590 phantom: std::marker::PhantomData<(K, V)>,
591}
592
593impl<K, V> SimplifiedRange<K, V>
594where
595 for<'de> K: 'static + Clone + KeySerializer + Send,
596 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
597{
598 fn new(
599 range_start: Bound<KeyVec>,
600 range_end: Bound<KeyVec>,
601 disk_table: &Table,
602 serialization: bincode::config::DefaultOptions,
603 ) -> SimplifiedRange<K, V> {
604 let mut table_it = disk_table.iter();
605 let mut exhausted = false;
606
607 match &range_start {
609 Bound::Included(start) => {
610 let start: &[u8] = start;
611 let mut key = Vec::default();
612 let mut value = Vec::default();
613
614 table_it.seek(start);
615
616 if table_it.valid() && table_it.current(&mut key, &mut value) {
617 let key: &[u8] = &key;
618 let start_included = match &range_start {
620 Bound::Included(start) => {
621 let start: &[u8] = start;
622 key >= start
623 }
624 Bound::Excluded(start) => {
625 let start: &[u8] = start;
626 key > start
627 }
628 Bound::Unbounded => true,
629 };
630 let end_included = match &range_end {
631 Bound::Included(end) => {
632 let end: &[u8] = end;
633 key <= end
634 }
635 Bound::Excluded(end) => {
636 let end: &[u8] = end;
637 key < end
638 }
639 Bound::Unbounded => true,
640 };
641 if !start_included || !end_included {
642 exhausted = true;
643 }
644 } else {
645 exhausted = true;
647 }
648 }
649 Bound::Excluded(start_bound) => {
650 let start_bound: &[u8] = start_bound;
651
652 let mut key: Vec<u8> = Vec::default();
653 let mut value = Vec::default();
654
655 table_it.seek(start_bound);
656 if table_it.valid() && table_it.current(&mut key, &mut value) {
657 let key: &[u8] = &key;
658 if key == start_bound {
659 table_it.advance();
661 }
662 }
663
664 if table_it.valid() && table_it.current(&mut key, &mut value) {
666 let key: &[u8] = &key;
667
668 let start_included = match &range_start {
670 Bound::Included(start) => {
671 let start: &[u8] = start;
672 key >= start
673 }
674 Bound::Excluded(start) => {
675 let start: &[u8] = start;
676 key > start
677 }
678 Bound::Unbounded => true,
679 };
680 let end_included = match &range_end {
681 Bound::Included(end) => {
682 let end: &[u8] = end;
683 key <= end
684 }
685 Bound::Excluded(end) => {
686 let end: &[u8] = end;
687 key < end
688 }
689 Bound::Unbounded => true,
690 };
691 if !start_included || !end_included {
692 exhausted = true;
693 }
694 } else {
695 exhausted = true;
697 }
698 }
699 Bound::Unbounded => {
700 table_it.seek_to_first();
701
702 if !table_it.valid() {
703 exhausted = true;
704 }
705 }
706 };
707
708 SimplifiedRange {
709 range_start,
710 range_end,
711 exhausted,
712 table_it,
713 serialization,
714 current_key: Vec::new(),
715 current_value: Vec::new(),
716 phantom: std::marker::PhantomData,
717 }
718 }
719
720 fn range_contains(&self, item: &[u8]) -> bool {
721 (match &self.range_start {
722 Bound::Included(start) => start.as_slice() <= item,
723 Bound::Excluded(start) => start.as_slice() < item,
724 Bound::Unbounded => true,
725 }) && (match &self.range_end {
726 Bound::Included(end) => item <= end.as_ref(),
727 Bound::Excluded(end) => item < end.as_ref(),
728 Bound::Unbounded => true,
729 })
730 }
731}
732
733impl<K, V> Iterator for SimplifiedRange<K, V>
734where
735 for<'de> K: 'static + Clone + KeySerializer + Send,
736 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
737{
738 type Item = Result<(K, V)>;
739
740 fn next(&mut self) -> Option<Self::Item> {
741 while !self.exhausted && self.table_it.valid() {
742 if self
743 .table_it
744 .current(&mut self.current_key, &mut self.current_value)
745 {
746 if self.range_contains(&self.current_key) {
747 let value: Option<V> = match self.serialization.deserialize(&self.current_value)
748 {
749 Ok(value) => value,
750 Err(e) => return Some(Err(e.into())),
751 };
752
753 self.table_it.advance();
754
755 if let Some(value) = value {
756 let key = match K::parse_key(&self.current_key) {
757 Ok(key) => key,
758 Err(e) => return Some(Err(e.into())),
759 };
760 return Some(Ok((key, value)));
761 }
762 } else {
763 self.exhausted = true;
764 }
765 }
766 }
767 None
768 }
769}
770
771impl<K, V> FusedIterator for SimplifiedRange<K, V>
772where
773 K: 'static + Clone + KeySerializer + Send,
774 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
775{
776}
777
778#[cfg(test)]
779mod tests;