lsm_tree/segment/
reader.rs1use super::{
6 block::offset::BlockOffset,
7 value_block::{CachePolicy, ValueBlock},
8 value_block_consumer::ValueBlockConsumer,
9};
10use crate::{
11 cache::Cache, descriptor_table::FileDescriptorTable, segment::block::header::Header,
12 value::InternalValue, GlobalSegmentId, UserKey,
13};
14use std::sync::Arc;
15
16pub struct Reader {
17 segment_id: GlobalSegmentId,
18
19 descriptor_table: Arc<FileDescriptorTable>,
20 block_cache: Arc<Cache>,
21
22 data_block_boundary: BlockOffset,
23
24 pub lo_block_offset: BlockOffset,
25 pub(crate) lo_block_size: u64,
26 pub(crate) lo_block_items: Option<ValueBlockConsumer>,
27 pub(crate) lo_initialized: bool,
28
29 pub hi_block_offset: Option<BlockOffset>,
30 pub hi_block_backlink: BlockOffset,
31 pub hi_block_items: Option<ValueBlockConsumer>,
32 pub hi_initialized: bool,
33
34 start_key: Option<UserKey>,
35 end_key: Option<UserKey>,
36
37 cache_policy: CachePolicy,
38}
39
40impl Reader {
41 #[must_use]
42 pub fn new(
43 data_block_boundary: BlockOffset,
44 descriptor_table: Arc<FileDescriptorTable>,
45 segment_id: GlobalSegmentId,
46 block_cache: Arc<Cache>,
47 lo_block_offset: BlockOffset,
48 hi_block_offset: Option<BlockOffset>,
49 ) -> Self {
50 Self {
51 data_block_boundary,
52
53 descriptor_table,
54 segment_id,
55 block_cache,
56
57 lo_block_offset,
58 lo_block_size: 0,
59 lo_block_items: None,
60 lo_initialized: false,
61
62 hi_block_offset,
63 hi_block_backlink: BlockOffset(0),
64 hi_block_items: None,
65 hi_initialized: false,
66
67 cache_policy: CachePolicy::Write,
68
69 start_key: None,
70 end_key: None,
71 }
72 }
73
74 pub fn set_lower_bound(&mut self, key: UserKey) {
76 self.start_key = Some(key);
77 }
78
79 pub fn set_upper_bound(&mut self, key: UserKey) {
81 self.end_key = Some(key);
82 }
83
84 #[must_use]
86 pub fn cache_policy(mut self, policy: CachePolicy) -> Self {
87 self.cache_policy = policy;
88 self
89 }
90
91 fn load_data_block(
92 &self,
93 offset: BlockOffset,
94 ) -> crate::Result<Option<(u64, BlockOffset, ValueBlockConsumer)>> {
95 let block = ValueBlock::load_by_block_handle(
96 &self.descriptor_table,
97 &self.block_cache,
98 self.segment_id,
99 offset,
100 self.cache_policy,
101 )?;
102
103 block.map_or(Ok(None), |block| {
108 Ok(Some((
109 block.header.data_length.into(),
110 block.header.previous_block_offset,
111 ValueBlockConsumer::with_bounds(
112 block,
113 self.start_key.as_deref(),
114 self.end_key.as_deref(),
115 ),
116 )))
117 })
118 }
119
120 fn initialize_lo(&mut self) -> crate::Result<()> {
121 if let Some((size, _, items)) = self.load_data_block(self.lo_block_offset)? {
122 self.lo_block_items = Some(items);
123 self.lo_block_size = size;
124 }
125
126 self.lo_initialized = true;
127
128 Ok(())
129 }
130
131 fn initialize_hi(&mut self) -> crate::Result<()> {
132 let offset = self
133 .hi_block_offset
134 .expect("no hi offset configured for segment reader");
135
136 if let Some((_, backlink, items)) = self.load_data_block(offset)? {
137 self.hi_block_items = Some(items);
138 self.hi_block_backlink = backlink;
139 }
140
141 self.hi_initialized = true;
142
143 Ok(())
144 }
145}
146
147impl Iterator for Reader {
148 type Item = crate::Result<InternalValue>;
149
150 fn next(&mut self) -> Option<Self::Item> {
151 if !self.lo_initialized {
152 fail_iter!(self.initialize_lo());
153 }
154
155 if let Some(head) = self.lo_block_items.as_mut()?.next() {
156 return Some(Ok(head));
158 }
159
160 let next_block_offset = BlockOffset(
164 *self.lo_block_offset + Header::serialized_len() as u64 + self.lo_block_size,
165 );
166
167 assert_ne!(
168 self.lo_block_offset, next_block_offset,
169 "invalid next block offset"
170 );
171
172 if next_block_offset >= self.data_block_boundary {
173 return None;
175 }
176
177 if let Some(hi_offset) = self.hi_block_offset {
178 if next_block_offset == hi_offset {
179 if !self.hi_initialized {
180 fail_iter!(self.initialize_hi());
181 }
182
183 return self.hi_block_items.as_mut()?.next().map(Ok);
185 }
186 }
187
188 match fail_iter!(self.load_data_block(next_block_offset)) {
191 Some((size, _, items)) => {
192 self.lo_block_items = Some(items);
193 self.lo_block_size = size;
194 self.lo_block_offset = next_block_offset;
195
196 self.lo_block_items.as_mut()?.next().map(Ok)
198 }
199 None => {
200 panic!("searched for invalid data block");
201 }
202 }
203 }
204}
205
206impl DoubleEndedIterator for Reader {
207 fn next_back(&mut self) -> Option<Self::Item> {
208 if !self.hi_initialized {
209 fail_iter!(self.initialize_hi());
210 }
211
212 loop {
213 let hi_offset = self
215 .hi_block_offset
216 .expect("no hi offset configured for segment reader");
217
218 if hi_offset == self.lo_block_offset {
219 if !self.lo_initialized {
220 fail_iter!(self.initialize_lo());
221 }
222
223 return self.lo_block_items.as_mut()?.next_back().map(Ok);
225 }
226
227 if let Some(tail) = self.hi_block_items.as_mut()?.next_back() {
228 return Some(Ok(tail));
230 }
231
232 if hi_offset == BlockOffset(0) {
235 return None;
237 }
238
239 let prev_block_offset = self.hi_block_backlink;
241
242 if prev_block_offset == self.lo_block_offset {
243 if !self.lo_initialized {
244 fail_iter!(self.initialize_lo());
245 }
246
247 return self.lo_block_items.as_mut()?.next_back().map(Ok);
249 }
250
251 match fail_iter!(self.load_data_block(prev_block_offset)) {
254 Some((_, backlink, items)) => {
255 self.hi_block_items = Some(items);
256 self.hi_block_backlink = backlink;
257 self.hi_block_offset = Some(prev_block_offset);
258
259 if let Some(item) = self.hi_block_items.as_mut()?.next_back() {
261 return Some(Ok(item));
262 }
263 }
264 None => {
265 panic!("searched for invalid data block");
266 }
267 }
268 }
269 }
270}