graphannis_core/util/
disk_collections.rs

1use bincode::config::Options;
2use itertools::Itertools;
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use sstable::{SSIterator, Table, TableBuilder, TableIterator};
6use transient_btree_index::{BtreeConfig, BtreeIndex};
7
8use crate::serializer::KeyVec;
9use crate::{errors::Result, serializer::KeySerializer};
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::iter::{FusedIterator, Peekable};
13use std::marker::PhantomData;
14use std::ops::{Bound, RangeBounds};
15use std::path::Path;
16
17const KB: usize = 1 << 10;
18pub const MB: usize = KB * KB;
19const BLOCK_MAX_SIZE: usize = 4 * KB;
20
21/// Uses a cache for each disk table with 8 MB capacity.
22pub const DEFAULT_BLOCK_CACHE_CAPACITY: usize = 8 * MB;
23
24pub enum EvictionStrategy {
25    MaximumItems(usize),
26}
27
28impl Default for EvictionStrategy {
29    fn default() -> Self {
30        EvictionStrategy::MaximumItems(10_000)
31    }
32}
33
34pub struct DiskMap<K, V>
35where
36    K: 'static + KeySerializer + Serialize + DeserializeOwned + Clone + Send + Sync + Ord,
37    for<'de> V: 'static + Serialize + Deserialize<'de> + Clone + Send + Sync,
38{
39    eviction_strategy: EvictionStrategy,
40    block_cache_capacity: usize,
41    c0: BTreeMap<K, Option<V>>,
42    c1: Option<BtreeIndex<K, Option<V>>>,
43    c2: Option<Table>,
44    serialization: bincode::config::DefaultOptions,
45
46    c1_btree_config: BtreeConfig,
47}
48
49fn custom_options(block_cache_capacity: usize) -> sstable::Options {
50    let blocks = (block_cache_capacity / BLOCK_MAX_SIZE).max(1);
51    sstable::Options::default().with_cache_capacity(blocks)
52}
53
54impl<K, V> DiskMap<K, V>
55where
56    K: 'static + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync + Ord,
57    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
58{
59    pub fn new(
60        persisted_file: Option<&Path>,
61        eviction_strategy: EvictionStrategy,
62        block_cache_capacity: usize,
63        c1_config: BtreeConfig,
64    ) -> Result<DiskMap<K, V>> {
65        let mut disk_table = None;
66
67        if let Some(persisted_file) = persisted_file
68            && persisted_file.is_file()
69        {
70            // Use existing file as read-only table which contains the whole map
71            let table = Table::new_from_file(custom_options(block_cache_capacity), persisted_file)?;
72            disk_table = Some(table);
73        }
74
75        Ok(DiskMap {
76            eviction_strategy,
77            block_cache_capacity,
78            c0: BTreeMap::default(),
79            c2: disk_table,
80            serialization: bincode::options(),
81            c1: None,
82            c1_btree_config: c1_config,
83        })
84    }
85
86    pub fn new_temporary(
87        eviction_strategy: EvictionStrategy,
88        block_cache_capacity: usize,
89        c1_config: BtreeConfig,
90    ) -> DiskMap<K, V> {
91        DiskMap {
92            eviction_strategy,
93            block_cache_capacity,
94            c0: BTreeMap::default(),
95            c2: None,
96            serialization: bincode::options(),
97            c1: None,
98            c1_btree_config: c1_config,
99        }
100    }
101
102    pub fn insert(&mut self, key: K, value: V) -> Result<()> {
103        self.c0.insert(key, Some(value));
104
105        self.evict_c0_if_necessary()?;
106
107        Ok(())
108    }
109
110    pub fn get(&self, key: &K) -> Result<Option<Cow<'_, V>>> {
111        // Check C0 first
112        if let Some(entry) = self.c0.get(key) {
113            if let Some(value) = entry {
114                return Ok(Some(Cow::Borrowed(value)));
115            } else {
116                // Value was explicitly deleted with a tombstone entry.
117                // Do not query C1 and C2.
118                return Ok(None);
119            }
120        }
121        // Check C1 (BTree disk index)
122        if let Some(c1) = &self.c1
123            && let Some(entry) = c1.get(key)?
124        {
125            if let Some(value) = entry {
126                return Ok(Some(Cow::Owned(value)));
127            } else {
128                // Value was explicitly deleted with a tombstone entry.
129                // Do not query C1 and C2.
130                return Ok(None);
131            }
132        }
133
134        // Check the C2 (sstable)
135        if let Some(c2) = &self.c2 {
136            let key = K::create_key(key);
137            if let Some(value) = c2.get(&key)? {
138                let value: Option<V> = self.serialization.deserialize(&value)?;
139                if let Some(value) = value {
140                    return Ok(Some(Cow::Owned(value)));
141                } else {
142                    // Value was explicitly deleted (and still written to disk)
143                    return Ok(None);
144                }
145            }
146        }
147
148        Ok(None)
149    }
150
151    pub fn contains_key(&self, key: &K) -> Result<bool> {
152        // Check C0 first
153        if let Some(value) = self.c0.get(key) {
154            if value.is_some() {
155                return Ok(true);
156            } else {
157                // Value was explicitly deleted, do not query the disk tables
158                return Ok(false);
159            }
160        }
161
162        // Check C1 (BTree disk index)
163        if let Some(c1) = &self.c1
164            && c1.contains_key(key)?
165        {
166            return Ok(true);
167        }
168
169        // Use a iterator on the single disk to check if there is an entry with this, without getting the value.
170        // Since we don't serialize tombstone entries when compacting or writing the disk table to an output file,
171        // when we are checking the key, we can safely assume the value is Some() and not None.
172        if let Some(c2) = &self.c2 {
173            let mut table_it = c2.iter();
174            let key = K::create_key(key);
175            table_it.seek(&key);
176            if let Some(it_key) = table_it.current_key()
177                && it_key == key.as_ref()
178            {
179                return Ok(true);
180            }
181        }
182        Ok(false)
183    }
184
185    pub fn remove(&mut self, key: &K) -> Result<Option<V>> {
186        let existing = self.get(key)?.map(|existing| existing.into_owned());
187        if existing.is_some() {
188            // Add tombstone entry
189            self.c0.insert(key.clone(), None);
190
191            self.evict_c0_if_necessary()?;
192        }
193
194        Ok(existing)
195    }
196
197    pub fn iter(&self) -> Result<ResultIterator<'_, K, V>> {
198        if let Some(c1) = &self.c1 {
199            if self.c0.is_empty() && self.c2.is_none() {
200                // Create an iterator that skips the thombstone entries
201                let it = c1
202                    .range(..)?
203                    .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
204                    .map(|entry| entry.map_err(|e| e.into()));
205
206                return Ok(Box::new(it));
207            }
208        } else if let Some(c2) = &self.c2 {
209            if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
210                let table_iterator = c2.iter();
211                let it = SingleTableIterator {
212                    table_iterator,
213                    serialization: self.serialization,
214                    phantom: PhantomData,
215                };
216                return Ok(Box::new(it));
217            }
218        } else {
219            // Create an iterator that skips the thombstone entries
220            let it = self
221                .c0
222                .iter()
223                .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
224            return Ok(Box::new(it));
225        }
226
227        // Use the flexible range iterator as default
228        Ok(Box::new(self.range(..)))
229    }
230
231    /// Returns an iterator over a range of entries.
232    pub fn range<'a, R>(&'a self, range: R) -> Box<dyn Iterator<Item = Result<(K, V)>> + 'a>
233    where
234        R: RangeBounds<K> + Clone,
235    {
236        // Check if C0, C1 or C2 are the only non-empty maps and return a specialized iterator
237        if let Some(c1) = &self.c1 {
238            if self.c0.is_empty() && self.c2.is_none() {
239                let c1_range = match c1.range(range).map_err(|e| e.into()) {
240                    Ok(c1_range) => c1_range,
241                    Err(e) => return Box::new(std::iter::once(Err(e))),
242                };
243                // Return iterator over C1 that skips the tombstone entries
244                let it = c1_range
245                    .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
246                    .map(|entry| entry.map_err(|e| e.into()));
247                return Box::new(it);
248            }
249        } else if let Some(c2) = &self.c2 {
250            if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
251                let mapped_start_bound: std::ops::Bound<KeyVec> = match range.start_bound() {
252                    Bound::Included(end) => Bound::Included(K::create_key(end)),
253                    Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
254                    Bound::Unbounded => Bound::Unbounded,
255                };
256
257                let mapped_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
258                    Bound::Included(end) => Bound::Included(K::create_key(end)),
259                    Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
260                    Bound::Unbounded => Bound::Unbounded,
261                };
262
263                // Return iterator over C2
264                return Box::new(SimplifiedRange::new(
265                    mapped_start_bound,
266                    mapped_end_bound,
267                    c2,
268                    self.serialization,
269                ));
270            }
271        } else {
272            // Neither C1 nor C2 exist:
273            // return range iterator over all C0 entries, but skip the tombestone entries
274            let it = self
275                .c0
276                .range(range)
277                .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
278            return Box::new(it);
279        }
280        // Use a combined iterator as default
281        match CombinedRange::new(
282            range,
283            &self.c0,
284            self.c1.as_ref(),
285            self.c2.as_ref(),
286            self.serialization,
287        ) {
288            Ok(result) => Box::new(result),
289            Err(e) => Box::new(std::iter::once(Err(e))),
290        }
291    }
292
293    pub fn is_empty(&self) -> Result<bool> {
294        if self.c0.is_empty() && self.c1.is_none() && self.c2.is_none() {
295            return Ok(true);
296        }
297        let mut it = self.iter()?;
298        Ok(it.next().is_none())
299    }
300
301    pub fn clear(&mut self) {
302        self.c0.clear();
303        self.c1 = None;
304        self.c2 = None;
305    }
306
307    pub fn write_to(&self, location: &Path) -> Result<()> {
308        // Make sure the parent directory exist
309        if let Some(parent) = location.parent() {
310            std::fs::create_dir_all(parent)?;
311        }
312        // Open file as writable
313        let out_file = std::fs::OpenOptions::new()
314            .write(true)
315            .read(true)
316            .create(true)
317            .truncate(true)
318            .open(location)?;
319        let mut builder = TableBuilder::new(custom_options(self.block_cache_capacity), out_file);
320        for entry in self.iter()? {
321            let (key, value) = entry?;
322            let key = key.create_key();
323            let value = Some(value);
324            builder.add(&key, &self.serialization.serialize(&value)?)?;
325        }
326        builder.finish()?;
327
328        Ok(())
329    }
330
331    /// Compact the existing disk tables and the in-memory table to a single temporary disk table.
332    pub fn compact(&mut self) -> Result<()> {
333        debug!("Evicting C0 and merging it with existing C1 to a temporary file");
334
335        if self.c1.is_none() {
336            let c1 = BtreeIndex::with_capacity(self.c1_btree_config.clone(), self.c0.len())?;
337            self.c1 = Some(c1);
338        }
339
340        if let Some(c1) = self.c1.as_mut() {
341            let mut c0 = BTreeMap::new();
342            std::mem::swap(&mut self.c0, &mut c0);
343            for (k, v) in c0.into_iter() {
344                c1.insert(k, v)?;
345            }
346        }
347
348        debug!("Finished evicting C0");
349        Ok(())
350    }
351
352    fn evict_c0_if_necessary(&mut self) -> Result<()> {
353        let evict_c0 = match self.eviction_strategy {
354            EvictionStrategy::MaximumItems(n) => self.c0.len() >= n,
355        };
356
357        if evict_c0 {
358            self.compact()?;
359        }
360
361        Ok(())
362    }
363}
364
365/// Implements an optimized iterator a single disk table.
366struct SingleTableIterator<K, V> {
367    table_iterator: TableIterator,
368    serialization: bincode::config::DefaultOptions,
369    phantom: std::marker::PhantomData<(K, V)>,
370}
371
372impl<K, V> Iterator for SingleTableIterator<K, V>
373where
374    for<'de> K: 'static + Clone + KeySerializer + Send,
375    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
376{
377    type Item = Result<(K, V)>;
378
379    fn next(&mut self) -> Option<Self::Item> {
380        while let Some((key, value)) = self.table_iterator.next() {
381            let key = match K::parse_key(&key) {
382                Ok(key) => key,
383                Err(e) => return Some(Err(e.into())),
384            };
385            let value: Option<V> = match self.serialization.deserialize(&value) {
386                Ok(value) => value,
387                Err(e) => return Some(Err(e.into())),
388            };
389            if let Some(value) = value {
390                return Some(Ok((key, value)));
391            }
392        }
393        None
394    }
395}
396
397impl<K, V> FusedIterator for SingleTableIterator<K, V>
398where
399    K: 'static + Clone + KeySerializer + Send,
400    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
401{
402}
403
404type ResultIterator<'a, K, V> = Box<dyn Iterator<Item = Result<(K, V)>> + 'a>;
405
406pub struct CombinedRange<'a, K, V>
407where
408    for<'de> K: 'static + Clone + KeySerializer + Send,
409    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
410{
411    c0_iterator: Peekable<std::collections::btree_map::Range<'a, K, Option<V>>>,
412    c1_iterator: Peekable<ResultIterator<'a, K, Option<V>>>,
413    c2_iterator: Peekable<ResultIterator<'a, K, V>>,
414}
415
416impl<'a, K, V> CombinedRange<'a, K, V>
417where
418    for<'de> K: 'static + Clone + KeySerializer + Serialize + Deserialize<'de> + Send + Sync + Ord,
419    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
420{
421    fn new<R: RangeBounds<K> + Clone>(
422        range: R,
423        c0: &'a BTreeMap<K, Option<V>>,
424        c1: Option<&'a BtreeIndex<K, Option<V>>>,
425        c2: Option<&Table>,
426        serialization: bincode::config::DefaultOptions,
427    ) -> Result<CombinedRange<'a, K, V>> {
428        let c1_iterator: Box<dyn Iterator<Item = Result<(K, Option<V>)>>> = if let Some(c1) = c1 {
429            let it = c1
430                .range(range.clone())?
431                .map(|entry| entry.map_err(|e| e.into()));
432            Box::new(it)
433        } else {
434            Box::new(std::iter::empty())
435        };
436
437        let c2: Box<dyn Iterator<Item = Result<(K, V)>>> = if let Some(c2) = c2 {
438            let table_start_bound = match range.start_bound() {
439                Bound::Included(end) => Bound::Included(K::create_key(end)),
440                Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
441                Bound::Unbounded => Bound::Unbounded,
442            };
443
444            let table_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
445                Bound::Included(end) => Bound::Included(K::create_key(end)),
446                Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
447                Bound::Unbounded => Bound::Unbounded,
448            };
449
450            let it = SimplifiedRange::new(table_start_bound, table_end_bound, c2, serialization);
451            Box::new(it)
452        } else {
453            Box::new(std::iter::empty())
454        };
455
456        Ok(CombinedRange {
457            c0_iterator: c0.range(range).peekable(),
458            c1_iterator: c1_iterator.peekable(),
459            c2_iterator: c2.peekable(),
460        })
461    }
462}
463
464impl<K, V> Iterator for CombinedRange<'_, K, V>
465where
466    K: Ord,
467    for<'de> K: 'static + Clone + KeySerializer + Send,
468    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
469{
470    type Item = Result<(K, V)>;
471
472    fn next(&mut self) -> Option<Self::Item> {
473        while self.c0_iterator.peek().is_some()
474            || self.c1_iterator.peek().is_some()
475            || self.c2_iterator.peek().is_some()
476        {
477            // Get keys from all iterators and determine which is the smallest one
478            let c0 = self.c0_iterator.peek().map(|(k, _v)| Some(*k));
479            let c1 = self.c1_iterator.peek().map(|entry| match entry {
480                Ok((k, _v)) => Some(k),
481                Err(_) => None,
482            });
483            let c3 = self.c2_iterator.peek().map(|entry| match entry {
484                Ok((k, _v)) => Some(k),
485                Err(_) => None,
486            });
487
488            let min_key = vec![c0, c1, c3].into_iter().flatten().min();
489            if let Some(min_key) = min_key {
490                let c0_is_min = c0 == Some(min_key);
491                let c1_is_min = c1 == Some(min_key);
492                let c3_is_min = c3 == Some(min_key);
493
494                // Advance all iterators with the same (minimal) key
495                let c0 = if c0_is_min {
496                    self.c0_iterator.next()
497                } else {
498                    None
499                };
500                let c1 = if c1_is_min {
501                    self.c1_iterator.next()
502                } else {
503                    None
504                };
505                let c3 = if c3_is_min {
506                    self.c2_iterator.next()
507                } else {
508                    None
509                };
510
511                // Output the value from the most recent map
512                if let Some((k, v)) = c0 {
513                    if let Some(v) = v {
514                        return Some(Ok((k.clone(), v.clone())));
515                    } else {
516                        // Value was explicitly deleted, do not check the other maps
517                        continue;
518                    }
519                } else if let Some(entry) = c1 {
520                    match entry {
521                        Ok((k, v)) => {
522                            if let Some(v) = v {
523                                return Some(Ok((k, v)));
524                            } else {
525                                // Value was explicitly deleted, do not check the other maps
526                                continue;
527                            }
528                        }
529                        Err(e) => {
530                            return Some(Err(e));
531                        }
532                    };
533                } else if let Some(entry) = c3 {
534                    match entry {
535                        Ok((k, v)) => {
536                            return Some(Ok((k, v)));
537                        }
538                        Err(e) => {
539                            return Some(Err(e));
540                        }
541                    }
542                }
543            }
544        }
545        None
546    }
547}
548
549impl<K, V> FusedIterator for CombinedRange<'_, K, V>
550where
551    K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
552    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
553{
554}
555
556impl<K, V> Default for DiskMap<K, V>
557where
558    K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync,
559    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
560{
561    fn default() -> Self {
562        DiskMap::new_temporary(
563            EvictionStrategy::default(),
564            DEFAULT_BLOCK_CACHE_CAPACITY,
565            BtreeConfig::default(),
566        )
567    }
568}
569
570/// An iterator implementation for the case that there is only a single disk-table and no C0
571struct SimplifiedRange<K, V> {
572    range_start: Bound<KeyVec>,
573    range_end: Bound<KeyVec>,
574    table_it: TableIterator,
575    exhausted: bool,
576    serialization: bincode::config::DefaultOptions,
577
578    current_key: Vec<u8>,
579    current_value: Vec<u8>,
580
581    phantom: std::marker::PhantomData<(K, V)>,
582}
583
584impl<K, V> SimplifiedRange<K, V>
585where
586    for<'de> K: 'static + Clone + KeySerializer + Send,
587    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
588{
589    fn new(
590        range_start: Bound<KeyVec>,
591        range_end: Bound<KeyVec>,
592        disk_table: &Table,
593        serialization: bincode::config::DefaultOptions,
594    ) -> SimplifiedRange<K, V> {
595        let mut table_it = disk_table.iter();
596        let mut exhausted = false;
597
598        // Initialize the table iterators
599        match &range_start {
600            Bound::Included(start) => {
601                let start: &[u8] = start;
602                let mut key = Vec::default();
603                let mut value = Vec::default();
604
605                table_it.seek(start);
606
607                if table_it.valid() && table_it.current(&mut key, &mut value) {
608                    let key: &[u8] = &key;
609                    // Check if the seeked element is actually part of the range
610                    let start_included = match &range_start {
611                        Bound::Included(start) => {
612                            let start: &[u8] = start;
613                            key >= start
614                        }
615                        Bound::Excluded(start) => {
616                            let start: &[u8] = start;
617                            key > start
618                        }
619                        Bound::Unbounded => true,
620                    };
621                    let end_included = match &range_end {
622                        Bound::Included(end) => {
623                            let end: &[u8] = end;
624                            key <= end
625                        }
626                        Bound::Excluded(end) => {
627                            let end: &[u8] = end;
628                            key < end
629                        }
630                        Bound::Unbounded => true,
631                    };
632                    if !start_included || !end_included {
633                        exhausted = true;
634                    }
635                } else {
636                    // Seeked behind last element
637                    exhausted = true;
638                }
639            }
640            Bound::Excluded(start_bound) => {
641                let start_bound: &[u8] = start_bound;
642
643                let mut key: Vec<u8> = Vec::default();
644                let mut value = Vec::default();
645
646                table_it.seek(start_bound);
647                if table_it.valid() && table_it.current(&mut key, &mut value) {
648                    let key: &[u8] = &key;
649                    if key == start_bound {
650                        // We need to exclude the first match
651                        table_it.advance();
652                    }
653                }
654
655                // Check key after advance
656                if table_it.valid() && table_it.current(&mut key, &mut value) {
657                    let key: &[u8] = &key;
658
659                    // Check if the seeked element is actually part of the range
660                    let start_included = match &range_start {
661                        Bound::Included(start) => {
662                            let start: &[u8] = start;
663                            key >= start
664                        }
665                        Bound::Excluded(start) => {
666                            let start: &[u8] = start;
667                            key > start
668                        }
669                        Bound::Unbounded => true,
670                    };
671                    let end_included = match &range_end {
672                        Bound::Included(end) => {
673                            let end: &[u8] = end;
674                            key <= end
675                        }
676                        Bound::Excluded(end) => {
677                            let end: &[u8] = end;
678                            key < end
679                        }
680                        Bound::Unbounded => true,
681                    };
682                    if !start_included || !end_included {
683                        exhausted = true;
684                    }
685                } else {
686                    // Seeked behind last element
687                    exhausted = true;
688                }
689            }
690            Bound::Unbounded => {
691                table_it.seek_to_first();
692
693                if !table_it.valid() {
694                    exhausted = true;
695                }
696            }
697        };
698
699        SimplifiedRange {
700            range_start,
701            range_end,
702            exhausted,
703            table_it,
704            serialization,
705            current_key: Vec::new(),
706            current_value: Vec::new(),
707            phantom: std::marker::PhantomData,
708        }
709    }
710
711    fn range_contains(&self, item: &[u8]) -> bool {
712        (match &self.range_start {
713            Bound::Included(start) => start.as_slice() <= item,
714            Bound::Excluded(start) => start.as_slice() < item,
715            Bound::Unbounded => true,
716        }) && (match &self.range_end {
717            Bound::Included(end) => item <= end.as_ref(),
718            Bound::Excluded(end) => item < end.as_ref(),
719            Bound::Unbounded => true,
720        })
721    }
722}
723
724impl<K, V> Iterator for SimplifiedRange<K, V>
725where
726    for<'de> K: 'static + Clone + KeySerializer + Send,
727    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
728{
729    type Item = Result<(K, V)>;
730
731    fn next(&mut self) -> Option<Self::Item> {
732        while !self.exhausted && self.table_it.valid() {
733            if self
734                .table_it
735                .current(&mut self.current_key, &mut self.current_value)
736            {
737                if self.range_contains(&self.current_key) {
738                    let value: Option<V> = match self.serialization.deserialize(&self.current_value)
739                    {
740                        Ok(value) => value,
741                        Err(e) => return Some(Err(e.into())),
742                    };
743
744                    self.table_it.advance();
745
746                    if let Some(value) = value {
747                        let key = match K::parse_key(&self.current_key) {
748                            Ok(key) => key,
749                            Err(e) => return Some(Err(e.into())),
750                        };
751                        return Some(Ok((key, value)));
752                    }
753                } else {
754                    self.exhausted = true;
755                }
756            }
757        }
758        None
759    }
760}
761
762impl<K, V> FusedIterator for SimplifiedRange<K, V>
763where
764    K: 'static + Clone + KeySerializer + Send,
765    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
766{
767}
768
769#[cfg(test)]
770mod tests;