lsm_tree/segment/
reader.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{
6    block::offset::BlockOffset,
7    value_block::{CachePolicy, ValueBlock},
8    value_block_consumer::ValueBlockConsumer,
9};
10use crate::{
11    cache::Cache, descriptor_table::FileDescriptorTable, segment::block::header::Header,
12    value::InternalValue, GlobalSegmentId, UserKey,
13};
14use std::sync::Arc;
15
16pub struct Reader {
17    segment_id: GlobalSegmentId,
18
19    descriptor_table: Arc<FileDescriptorTable>,
20    block_cache: Arc<Cache>,
21
22    data_block_boundary: BlockOffset,
23
24    pub lo_block_offset: BlockOffset,
25    pub(crate) lo_block_size: u64,
26    pub(crate) lo_block_items: Option<ValueBlockConsumer>,
27    pub(crate) lo_initialized: bool,
28
29    pub hi_block_offset: Option<BlockOffset>,
30    pub hi_block_backlink: BlockOffset,
31    pub hi_block_items: Option<ValueBlockConsumer>,
32    pub hi_initialized: bool,
33
34    start_key: Option<UserKey>,
35    end_key: Option<UserKey>,
36
37    cache_policy: CachePolicy,
38}
39
40impl Reader {
41    #[must_use]
42    pub fn new(
43        data_block_boundary: BlockOffset,
44        descriptor_table: Arc<FileDescriptorTable>,
45        segment_id: GlobalSegmentId,
46        block_cache: Arc<Cache>,
47        lo_block_offset: BlockOffset,
48        hi_block_offset: Option<BlockOffset>,
49    ) -> Self {
50        Self {
51            data_block_boundary,
52
53            descriptor_table,
54            segment_id,
55            block_cache,
56
57            lo_block_offset,
58            lo_block_size: 0,
59            lo_block_items: None,
60            lo_initialized: false,
61
62            hi_block_offset,
63            hi_block_backlink: BlockOffset(0),
64            hi_block_items: None,
65            hi_initialized: false,
66
67            cache_policy: CachePolicy::Write,
68
69            start_key: None,
70            end_key: None,
71        }
72    }
73
74    /// Sets the lower bound block, such that as many blocks as possible can be skipped.
75    pub fn set_lower_bound(&mut self, key: UserKey) {
76        self.start_key = Some(key);
77    }
78
79    /// Sets the upper bound block, such that as many blocks as possible can be skipped.
80    pub fn set_upper_bound(&mut self, key: UserKey) {
81        self.end_key = Some(key);
82    }
83
84    /// Sets the cache policy
85    #[must_use]
86    pub fn cache_policy(mut self, policy: CachePolicy) -> Self {
87        self.cache_policy = policy;
88        self
89    }
90
91    fn load_data_block(
92        &self,
93        offset: BlockOffset,
94    ) -> crate::Result<Option<(u64, BlockOffset, ValueBlockConsumer)>> {
95        let block = ValueBlock::load_by_block_handle(
96            &self.descriptor_table,
97            &self.block_cache,
98            self.segment_id,
99            offset,
100            self.cache_policy,
101        )?;
102
103        // TODO: we only need to truncate items from blocks that are not the first and last block
104        // TODO: because any block inbetween must (trivially) only contain relevant items
105
106        // Truncate as many items as possible
107        block.map_or(Ok(None), |block| {
108            Ok(Some((
109                block.header.data_length.into(),
110                block.header.previous_block_offset,
111                ValueBlockConsumer::with_bounds(
112                    block,
113                    self.start_key.as_deref(),
114                    self.end_key.as_deref(),
115                ),
116            )))
117        })
118    }
119
120    fn initialize_lo(&mut self) -> crate::Result<()> {
121        if let Some((size, _, items)) = self.load_data_block(self.lo_block_offset)? {
122            self.lo_block_items = Some(items);
123            self.lo_block_size = size;
124        }
125
126        self.lo_initialized = true;
127
128        Ok(())
129    }
130
131    fn initialize_hi(&mut self) -> crate::Result<()> {
132        let offset = self
133            .hi_block_offset
134            .expect("no hi offset configured for segment reader");
135
136        if let Some((_, backlink, items)) = self.load_data_block(offset)? {
137            self.hi_block_items = Some(items);
138            self.hi_block_backlink = backlink;
139        }
140
141        self.hi_initialized = true;
142
143        Ok(())
144    }
145}
146
147impl Iterator for Reader {
148    type Item = crate::Result<InternalValue>;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        if !self.lo_initialized {
152            fail_iter!(self.initialize_lo());
153        }
154
155        if let Some(head) = self.lo_block_items.as_mut()?.next() {
156            // Just consume item
157            return Some(Ok(head));
158        }
159
160        // Front buffer is empty
161
162        // Load next block
163        let next_block_offset = BlockOffset(
164            *self.lo_block_offset + Header::serialized_len() as u64 + self.lo_block_size,
165        );
166
167        assert_ne!(
168            self.lo_block_offset, next_block_offset,
169            "invalid next block offset"
170        );
171
172        if next_block_offset >= self.data_block_boundary {
173            // We are done
174            return None;
175        }
176
177        if let Some(hi_offset) = self.hi_block_offset {
178            if next_block_offset == hi_offset {
179                if !self.hi_initialized {
180                    fail_iter!(self.initialize_hi());
181                }
182
183                // We reached the last block, consume from it instead
184                return self.hi_block_items.as_mut()?.next().map(Ok);
185            }
186        }
187
188        // TODO: when loading the next data block, we unnecessarily do binary search through it
189        // (ValueBlock::with_bounds), but we may be able to skip it sometimes
190        match fail_iter!(self.load_data_block(next_block_offset)) {
191            Some((size, _, items)) => {
192                self.lo_block_items = Some(items);
193                self.lo_block_size = size;
194                self.lo_block_offset = next_block_offset;
195
196                // We just loaded the block
197                self.lo_block_items.as_mut()?.next().map(Ok)
198            }
199            None => {
200                panic!("searched for invalid data block");
201            }
202        }
203    }
204}
205
206impl DoubleEndedIterator for Reader {
207    fn next_back(&mut self) -> Option<Self::Item> {
208        if !self.hi_initialized {
209            fail_iter!(self.initialize_hi());
210        }
211
212        loop {
213            // NOTE: See init function
214            let hi_offset = self
215                .hi_block_offset
216                .expect("no hi offset configured for segment reader");
217
218            if hi_offset == self.lo_block_offset {
219                if !self.lo_initialized {
220                    fail_iter!(self.initialize_lo());
221                }
222
223                // We reached the last block, consume from it instead
224                return self.lo_block_items.as_mut()?.next_back().map(Ok);
225            }
226
227            if let Some(tail) = self.hi_block_items.as_mut()?.next_back() {
228                // Just consume item
229                return Some(Ok(tail));
230            }
231
232            // Back buffer is empty
233
234            if hi_offset == BlockOffset(0) {
235                // We are done
236                return None;
237            }
238
239            // Load prev block
240            let prev_block_offset = self.hi_block_backlink;
241
242            if prev_block_offset == self.lo_block_offset {
243                if !self.lo_initialized {
244                    fail_iter!(self.initialize_lo());
245                }
246
247                // We reached the last block, consume from it instead
248                return self.lo_block_items.as_mut()?.next_back().map(Ok);
249            }
250
251            // TODO: when loading the next data block, we unnecessarily do binary search through it
252            // (ValueBlock::with_bounds), but we may be able to skip it sometimes
253            match fail_iter!(self.load_data_block(prev_block_offset)) {
254                Some((_, backlink, items)) => {
255                    self.hi_block_items = Some(items);
256                    self.hi_block_backlink = backlink;
257                    self.hi_block_offset = Some(prev_block_offset);
258
259                    // We just loaded the block
260                    if let Some(item) = self.hi_block_items.as_mut()?.next_back() {
261                        return Some(Ok(item));
262                    }
263                }
264                None => {
265                    panic!("searched for invalid data block");
266                }
267            }
268        }
269    }
270}