1use bincode::config::Options;
2use itertools::Itertools;
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use sstable::{SSIterator, Table, TableBuilder, TableIterator};
6use transient_btree_index::{BtreeConfig, BtreeIndex};
7
8use crate::serializer::KeyVec;
9use crate::{errors::Result, serializer::KeySerializer};
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::iter::{FusedIterator, Peekable};
13use std::marker::PhantomData;
14use std::ops::{Bound, RangeBounds};
15use std::path::Path;
16
17const KB: usize = 1 << 10;
18pub const MB: usize = KB * KB;
19const BLOCK_MAX_SIZE: usize = 4 * KB;
20
21pub const DEFAULT_BLOCK_CACHE_CAPACITY: usize = 8 * MB;
23
24pub enum EvictionStrategy {
25 MaximumItems(usize),
26}
27
28impl Default for EvictionStrategy {
29 fn default() -> Self {
30 EvictionStrategy::MaximumItems(10_000)
31 }
32}
33
34pub struct DiskMap<K, V>
35where
36 K: 'static + KeySerializer + Serialize + DeserializeOwned + Clone + Send + Sync + Ord,
37 for<'de> V: 'static + Serialize + Deserialize<'de> + Clone + Send + Sync,
38{
39 eviction_strategy: EvictionStrategy,
40 block_cache_capacity: usize,
41 c0: BTreeMap<K, Option<V>>,
42 c1: Option<BtreeIndex<K, Option<V>>>,
43 c2: Option<Table>,
44 serialization: bincode::config::DefaultOptions,
45
46 c1_btree_config: BtreeConfig,
47}
48
49fn custom_options(block_cache_capacity: usize) -> sstable::Options {
50 let blocks = (block_cache_capacity / BLOCK_MAX_SIZE).max(1);
51 sstable::Options::default().with_cache_capacity(blocks)
52}
53
54impl<K, V> DiskMap<K, V>
55where
56 K: 'static + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync + Ord,
57 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
58{
59 pub fn new(
60 persisted_file: Option<&Path>,
61 eviction_strategy: EvictionStrategy,
62 block_cache_capacity: usize,
63 c1_config: BtreeConfig,
64 ) -> Result<DiskMap<K, V>> {
65 let mut disk_table = None;
66
67 if let Some(persisted_file) = persisted_file
68 && persisted_file.is_file()
69 {
70 let table = Table::new_from_file(custom_options(block_cache_capacity), persisted_file)?;
72 disk_table = Some(table);
73 }
74
75 Ok(DiskMap {
76 eviction_strategy,
77 block_cache_capacity,
78 c0: BTreeMap::default(),
79 c2: disk_table,
80 serialization: bincode::options(),
81 c1: None,
82 c1_btree_config: c1_config,
83 })
84 }
85
86 pub fn new_temporary(
87 eviction_strategy: EvictionStrategy,
88 block_cache_capacity: usize,
89 c1_config: BtreeConfig,
90 ) -> DiskMap<K, V> {
91 DiskMap {
92 eviction_strategy,
93 block_cache_capacity,
94 c0: BTreeMap::default(),
95 c2: None,
96 serialization: bincode::options(),
97 c1: None,
98 c1_btree_config: c1_config,
99 }
100 }
101
102 pub fn insert(&mut self, key: K, value: V) -> Result<()> {
103 self.c0.insert(key, Some(value));
104
105 self.evict_c0_if_necessary()?;
106
107 Ok(())
108 }
109
110 pub fn get(&self, key: &K) -> Result<Option<Cow<'_, V>>> {
111 if let Some(entry) = self.c0.get(key) {
113 if let Some(value) = entry {
114 return Ok(Some(Cow::Borrowed(value)));
115 } else {
116 return Ok(None);
119 }
120 }
121 if let Some(c1) = &self.c1
123 && let Some(entry) = c1.get(key)?
124 {
125 if let Some(value) = entry {
126 return Ok(Some(Cow::Owned(value)));
127 } else {
128 return Ok(None);
131 }
132 }
133
134 if let Some(c2) = &self.c2 {
136 let key = K::create_key(key);
137 if let Some(value) = c2.get(&key)? {
138 let value: Option<V> = self.serialization.deserialize(&value)?;
139 if let Some(value) = value {
140 return Ok(Some(Cow::Owned(value)));
141 } else {
142 return Ok(None);
144 }
145 }
146 }
147
148 Ok(None)
149 }
150
151 pub fn contains_key(&self, key: &K) -> Result<bool> {
152 if let Some(value) = self.c0.get(key) {
154 if value.is_some() {
155 return Ok(true);
156 } else {
157 return Ok(false);
159 }
160 }
161
162 if let Some(c1) = &self.c1
164 && c1.contains_key(key)?
165 {
166 return Ok(true);
167 }
168
169 if let Some(c2) = &self.c2 {
173 let mut table_it = c2.iter();
174 let key = K::create_key(key);
175 table_it.seek(&key);
176 if let Some(it_key) = table_it.current_key()
177 && it_key == key.as_ref()
178 {
179 return Ok(true);
180 }
181 }
182 Ok(false)
183 }
184
185 pub fn remove(&mut self, key: &K) -> Result<Option<V>> {
186 let existing = self.get(key)?.map(|existing| existing.into_owned());
187 if existing.is_some() {
188 self.c0.insert(key.clone(), None);
190
191 self.evict_c0_if_necessary()?;
192 }
193
194 Ok(existing)
195 }
196
197 pub fn iter(&self) -> Result<ResultIterator<'_, K, V>> {
198 if let Some(c1) = &self.c1 {
199 if self.c0.is_empty() && self.c2.is_none() {
200 let it = c1
202 .range(..)?
203 .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
204 .map(|entry| entry.map_err(|e| e.into()));
205
206 return Ok(Box::new(it));
207 }
208 } else if let Some(c2) = &self.c2 {
209 if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
210 let table_iterator = c2.iter();
211 let it = SingleTableIterator {
212 table_iterator,
213 serialization: self.serialization,
214 phantom: PhantomData,
215 };
216 return Ok(Box::new(it));
217 }
218 } else {
219 let it = self
221 .c0
222 .iter()
223 .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
224 return Ok(Box::new(it));
225 }
226
227 Ok(Box::new(self.range(..)))
229 }
230
231 pub fn range<'a, R>(&'a self, range: R) -> Box<dyn Iterator<Item = Result<(K, V)>> + 'a>
233 where
234 R: RangeBounds<K> + Clone,
235 {
236 if let Some(c1) = &self.c1 {
238 if self.c0.is_empty() && self.c2.is_none() {
239 let c1_range = match c1.range(range).map_err(|e| e.into()) {
240 Ok(c1_range) => c1_range,
241 Err(e) => return Box::new(std::iter::once(Err(e))),
242 };
243 let it = c1_range
245 .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
246 .map(|entry| entry.map_err(|e| e.into()));
247 return Box::new(it);
248 }
249 } else if let Some(c2) = &self.c2 {
250 if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
251 let mapped_start_bound: std::ops::Bound<KeyVec> = match range.start_bound() {
252 Bound::Included(end) => Bound::Included(K::create_key(end)),
253 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
254 Bound::Unbounded => Bound::Unbounded,
255 };
256
257 let mapped_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
258 Bound::Included(end) => Bound::Included(K::create_key(end)),
259 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
260 Bound::Unbounded => Bound::Unbounded,
261 };
262
263 return Box::new(SimplifiedRange::new(
265 mapped_start_bound,
266 mapped_end_bound,
267 c2,
268 self.serialization,
269 ));
270 }
271 } else {
272 let it = self
275 .c0
276 .range(range)
277 .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
278 return Box::new(it);
279 }
280 match CombinedRange::new(
282 range,
283 &self.c0,
284 self.c1.as_ref(),
285 self.c2.as_ref(),
286 self.serialization,
287 ) {
288 Ok(result) => Box::new(result),
289 Err(e) => Box::new(std::iter::once(Err(e))),
290 }
291 }
292
293 pub fn is_empty(&self) -> Result<bool> {
294 if self.c0.is_empty() && self.c1.is_none() && self.c2.is_none() {
295 return Ok(true);
296 }
297 let mut it = self.iter()?;
298 Ok(it.next().is_none())
299 }
300
301 pub fn clear(&mut self) {
302 self.c0.clear();
303 self.c1 = None;
304 self.c2 = None;
305 }
306
307 pub fn write_to(&self, location: &Path) -> Result<()> {
308 if let Some(parent) = location.parent() {
310 std::fs::create_dir_all(parent)?;
311 }
312 let out_file = std::fs::OpenOptions::new()
314 .write(true)
315 .read(true)
316 .create(true)
317 .truncate(true)
318 .open(location)?;
319 let mut builder = TableBuilder::new(custom_options(self.block_cache_capacity), out_file);
320 for entry in self.iter()? {
321 let (key, value) = entry?;
322 let key = key.create_key();
323 let value = Some(value);
324 builder.add(&key, &self.serialization.serialize(&value)?)?;
325 }
326 builder.finish()?;
327
328 Ok(())
329 }
330
331 pub fn compact(&mut self) -> Result<()> {
333 debug!("Evicting C0 and merging it with existing C1 to a temporary file");
334
335 if self.c1.is_none() {
336 let c1 = BtreeIndex::with_capacity(self.c1_btree_config.clone(), self.c0.len())?;
337 self.c1 = Some(c1);
338 }
339
340 if let Some(c1) = self.c1.as_mut() {
341 let mut c0 = BTreeMap::new();
342 std::mem::swap(&mut self.c0, &mut c0);
343 for (k, v) in c0.into_iter() {
344 c1.insert(k, v)?;
345 }
346 }
347
348 debug!("Finished evicting C0");
349 Ok(())
350 }
351
352 fn evict_c0_if_necessary(&mut self) -> Result<()> {
353 let evict_c0 = match self.eviction_strategy {
354 EvictionStrategy::MaximumItems(n) => self.c0.len() >= n,
355 };
356
357 if evict_c0 {
358 self.compact()?;
359 }
360
361 Ok(())
362 }
363}
364
365struct SingleTableIterator<K, V> {
367 table_iterator: TableIterator,
368 serialization: bincode::config::DefaultOptions,
369 phantom: std::marker::PhantomData<(K, V)>,
370}
371
372impl<K, V> Iterator for SingleTableIterator<K, V>
373where
374 for<'de> K: 'static + Clone + KeySerializer + Send,
375 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
376{
377 type Item = Result<(K, V)>;
378
379 fn next(&mut self) -> Option<Self::Item> {
380 while let Some((key, value)) = self.table_iterator.next() {
381 let key = match K::parse_key(&key) {
382 Ok(key) => key,
383 Err(e) => return Some(Err(e.into())),
384 };
385 let value: Option<V> = match self.serialization.deserialize(&value) {
386 Ok(value) => value,
387 Err(e) => return Some(Err(e.into())),
388 };
389 if let Some(value) = value {
390 return Some(Ok((key, value)));
391 }
392 }
393 None
394 }
395}
396
397impl<K, V> FusedIterator for SingleTableIterator<K, V>
398where
399 K: 'static + Clone + KeySerializer + Send,
400 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
401{
402}
403
404type ResultIterator<'a, K, V> = Box<dyn Iterator<Item = Result<(K, V)>> + 'a>;
405
406pub struct CombinedRange<'a, K, V>
407where
408 for<'de> K: 'static + Clone + KeySerializer + Send,
409 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
410{
411 c0_iterator: Peekable<std::collections::btree_map::Range<'a, K, Option<V>>>,
412 c1_iterator: Peekable<ResultIterator<'a, K, Option<V>>>,
413 c2_iterator: Peekable<ResultIterator<'a, K, V>>,
414}
415
416impl<'a, K, V> CombinedRange<'a, K, V>
417where
418 for<'de> K: 'static + Clone + KeySerializer + Serialize + Deserialize<'de> + Send + Sync + Ord,
419 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
420{
421 fn new<R: RangeBounds<K> + Clone>(
422 range: R,
423 c0: &'a BTreeMap<K, Option<V>>,
424 c1: Option<&'a BtreeIndex<K, Option<V>>>,
425 c2: Option<&Table>,
426 serialization: bincode::config::DefaultOptions,
427 ) -> Result<CombinedRange<'a, K, V>> {
428 let c1_iterator: Box<dyn Iterator<Item = Result<(K, Option<V>)>>> = if let Some(c1) = c1 {
429 let it = c1
430 .range(range.clone())?
431 .map(|entry| entry.map_err(|e| e.into()));
432 Box::new(it)
433 } else {
434 Box::new(std::iter::empty())
435 };
436
437 let c2: Box<dyn Iterator<Item = Result<(K, V)>>> = if let Some(c2) = c2 {
438 let table_start_bound = match range.start_bound() {
439 Bound::Included(end) => Bound::Included(K::create_key(end)),
440 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
441 Bound::Unbounded => Bound::Unbounded,
442 };
443
444 let table_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
445 Bound::Included(end) => Bound::Included(K::create_key(end)),
446 Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
447 Bound::Unbounded => Bound::Unbounded,
448 };
449
450 let it = SimplifiedRange::new(table_start_bound, table_end_bound, c2, serialization);
451 Box::new(it)
452 } else {
453 Box::new(std::iter::empty())
454 };
455
456 Ok(CombinedRange {
457 c0_iterator: c0.range(range).peekable(),
458 c1_iterator: c1_iterator.peekable(),
459 c2_iterator: c2.peekable(),
460 })
461 }
462}
463
464impl<K, V> Iterator for CombinedRange<'_, K, V>
465where
466 K: Ord,
467 for<'de> K: 'static + Clone + KeySerializer + Send,
468 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
469{
470 type Item = Result<(K, V)>;
471
472 fn next(&mut self) -> Option<Self::Item> {
473 while self.c0_iterator.peek().is_some()
474 || self.c1_iterator.peek().is_some()
475 || self.c2_iterator.peek().is_some()
476 {
477 let c0 = self.c0_iterator.peek().map(|(k, _v)| Some(*k));
479 let c1 = self.c1_iterator.peek().map(|entry| match entry {
480 Ok((k, _v)) => Some(k),
481 Err(_) => None,
482 });
483 let c3 = self.c2_iterator.peek().map(|entry| match entry {
484 Ok((k, _v)) => Some(k),
485 Err(_) => None,
486 });
487
488 let min_key = vec![c0, c1, c3].into_iter().flatten().min();
489 if let Some(min_key) = min_key {
490 let c0_is_min = c0 == Some(min_key);
491 let c1_is_min = c1 == Some(min_key);
492 let c3_is_min = c3 == Some(min_key);
493
494 let c0 = if c0_is_min {
496 self.c0_iterator.next()
497 } else {
498 None
499 };
500 let c1 = if c1_is_min {
501 self.c1_iterator.next()
502 } else {
503 None
504 };
505 let c3 = if c3_is_min {
506 self.c2_iterator.next()
507 } else {
508 None
509 };
510
511 if let Some((k, v)) = c0 {
513 if let Some(v) = v {
514 return Some(Ok((k.clone(), v.clone())));
515 } else {
516 continue;
518 }
519 } else if let Some(entry) = c1 {
520 match entry {
521 Ok((k, v)) => {
522 if let Some(v) = v {
523 return Some(Ok((k, v)));
524 } else {
525 continue;
527 }
528 }
529 Err(e) => {
530 return Some(Err(e));
531 }
532 };
533 } else if let Some(entry) = c3 {
534 match entry {
535 Ok((k, v)) => {
536 return Some(Ok((k, v)));
537 }
538 Err(e) => {
539 return Some(Err(e));
540 }
541 }
542 }
543 }
544 }
545 None
546 }
547}
548
549impl<K, V> FusedIterator for CombinedRange<'_, K, V>
550where
551 K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
552 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
553{
554}
555
556impl<K, V> Default for DiskMap<K, V>
557where
558 K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync,
559 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
560{
561 fn default() -> Self {
562 DiskMap::new_temporary(
563 EvictionStrategy::default(),
564 DEFAULT_BLOCK_CACHE_CAPACITY,
565 BtreeConfig::default(),
566 )
567 }
568}
569
570struct SimplifiedRange<K, V> {
572 range_start: Bound<KeyVec>,
573 range_end: Bound<KeyVec>,
574 table_it: TableIterator,
575 exhausted: bool,
576 serialization: bincode::config::DefaultOptions,
577
578 current_key: Vec<u8>,
579 current_value: Vec<u8>,
580
581 phantom: std::marker::PhantomData<(K, V)>,
582}
583
584impl<K, V> SimplifiedRange<K, V>
585where
586 for<'de> K: 'static + Clone + KeySerializer + Send,
587 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
588{
589 fn new(
590 range_start: Bound<KeyVec>,
591 range_end: Bound<KeyVec>,
592 disk_table: &Table,
593 serialization: bincode::config::DefaultOptions,
594 ) -> SimplifiedRange<K, V> {
595 let mut table_it = disk_table.iter();
596 let mut exhausted = false;
597
598 match &range_start {
600 Bound::Included(start) => {
601 let start: &[u8] = start;
602 let mut key = Vec::default();
603 let mut value = Vec::default();
604
605 table_it.seek(start);
606
607 if table_it.valid() && table_it.current(&mut key, &mut value) {
608 let key: &[u8] = &key;
609 let start_included = match &range_start {
611 Bound::Included(start) => {
612 let start: &[u8] = start;
613 key >= start
614 }
615 Bound::Excluded(start) => {
616 let start: &[u8] = start;
617 key > start
618 }
619 Bound::Unbounded => true,
620 };
621 let end_included = match &range_end {
622 Bound::Included(end) => {
623 let end: &[u8] = end;
624 key <= end
625 }
626 Bound::Excluded(end) => {
627 let end: &[u8] = end;
628 key < end
629 }
630 Bound::Unbounded => true,
631 };
632 if !start_included || !end_included {
633 exhausted = true;
634 }
635 } else {
636 exhausted = true;
638 }
639 }
640 Bound::Excluded(start_bound) => {
641 let start_bound: &[u8] = start_bound;
642
643 let mut key: Vec<u8> = Vec::default();
644 let mut value = Vec::default();
645
646 table_it.seek(start_bound);
647 if table_it.valid() && table_it.current(&mut key, &mut value) {
648 let key: &[u8] = &key;
649 if key == start_bound {
650 table_it.advance();
652 }
653 }
654
655 if table_it.valid() && table_it.current(&mut key, &mut value) {
657 let key: &[u8] = &key;
658
659 let start_included = match &range_start {
661 Bound::Included(start) => {
662 let start: &[u8] = start;
663 key >= start
664 }
665 Bound::Excluded(start) => {
666 let start: &[u8] = start;
667 key > start
668 }
669 Bound::Unbounded => true,
670 };
671 let end_included = match &range_end {
672 Bound::Included(end) => {
673 let end: &[u8] = end;
674 key <= end
675 }
676 Bound::Excluded(end) => {
677 let end: &[u8] = end;
678 key < end
679 }
680 Bound::Unbounded => true,
681 };
682 if !start_included || !end_included {
683 exhausted = true;
684 }
685 } else {
686 exhausted = true;
688 }
689 }
690 Bound::Unbounded => {
691 table_it.seek_to_first();
692
693 if !table_it.valid() {
694 exhausted = true;
695 }
696 }
697 };
698
699 SimplifiedRange {
700 range_start,
701 range_end,
702 exhausted,
703 table_it,
704 serialization,
705 current_key: Vec::new(),
706 current_value: Vec::new(),
707 phantom: std::marker::PhantomData,
708 }
709 }
710
711 fn range_contains(&self, item: &[u8]) -> bool {
712 (match &self.range_start {
713 Bound::Included(start) => start.as_slice() <= item,
714 Bound::Excluded(start) => start.as_slice() < item,
715 Bound::Unbounded => true,
716 }) && (match &self.range_end {
717 Bound::Included(end) => item <= end.as_ref(),
718 Bound::Excluded(end) => item < end.as_ref(),
719 Bound::Unbounded => true,
720 })
721 }
722}
723
724impl<K, V> Iterator for SimplifiedRange<K, V>
725where
726 for<'de> K: 'static + Clone + KeySerializer + Send,
727 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
728{
729 type Item = Result<(K, V)>;
730
731 fn next(&mut self) -> Option<Self::Item> {
732 while !self.exhausted && self.table_it.valid() {
733 if self
734 .table_it
735 .current(&mut self.current_key, &mut self.current_value)
736 {
737 if self.range_contains(&self.current_key) {
738 let value: Option<V> = match self.serialization.deserialize(&self.current_value)
739 {
740 Ok(value) => value,
741 Err(e) => return Some(Err(e.into())),
742 };
743
744 self.table_it.advance();
745
746 if let Some(value) = value {
747 let key = match K::parse_key(&self.current_key) {
748 Ok(key) => key,
749 Err(e) => return Some(Err(e.into())),
750 };
751 return Some(Ok((key, value)));
752 }
753 } else {
754 self.exhausted = true;
755 }
756 }
757 }
758 None
759 }
760}
761
762impl<K, V> FusedIterator for SimplifiedRange<K, V>
763where
764 K: 'static + Clone + KeySerializer + Send,
765 for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
766{
767}
768
769#[cfg(test)]
770mod tests;