1use std::{
2 fs::File,
3 io::{Read, Seek, SeekFrom},
4 path::Path,
5};
6
7use fwob_core::{Key, KeyType, OwnedFrame};
8
9use crate::{
10 encoding::decode_page_payload,
11 file_header::{read_file_header, FileHeader},
12 page::PageHeader,
13 Result, V2Error,
14};
15
16pub struct Reader<R> {
17 inner: R,
18 header: FileHeader,
19 key_type: KeyType,
20 cached_page: Option<CachedPage>,
21}
22
23struct CachedPage {
24 index: u64,
25 header: PageHeader,
26 raw: Vec<u8>,
27}
28
29impl Reader<File> {
30 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
31 let file = File::open(path)?;
32 Self::new(file)
33 }
34}
35
36impl<R: Read + Seek> Reader<R> {
37 pub fn new(mut inner: R) -> Result<Self> {
38 let header = read_file_header(&mut inner)?;
39 let key_type = KeyType::from_field(header.schema.key_field())?;
40 Ok(Self {
41 inner,
42 header,
43 key_type,
44 cached_page: None,
45 })
46 }
47
48 pub fn header(&self) -> &FileHeader {
49 &self.header
50 }
51
52 pub fn read_page_header(&mut self, page_index: u64) -> Result<PageHeader> {
53 if page_index >= self.header.page_count {
54 return Err(V2Error::InvalidPageHeader(page_index));
55 }
56 self.inner
57 .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
58 PageHeader::read(&mut self.inner, page_index)
59 }
60
61 pub fn read_page_frames(&mut self, page_index: u64) -> Result<Vec<OwnedFrame>> {
62 self.load_page(page_index)?;
63 let raw = &self.cached_page.as_ref().expect("page loaded").raw;
64 let frame_len = self.header.schema.frame_len as usize;
65 let mut frames = Vec::with_capacity(raw.len() / frame_len);
66 for chunk in raw.chunks_exact(frame_len) {
67 frames.push(OwnedFrame::new(&self.header.schema, chunk.to_vec())?);
68 }
69 Ok(frames)
70 }
71
72 pub fn read_page_raw_frames(&mut self, page_index: u64) -> Result<Vec<u8>> {
73 self.load_page(page_index)?;
74 Ok(self.cached_page.as_ref().expect("page loaded").raw.clone())
75 }
76
77 pub fn read_frame_at(&mut self, index: u64) -> Result<Option<OwnedFrame>> {
78 let Some(page_index) = self.page_for_index_with_cache(index)? else {
79 return Ok(None);
80 };
81 self.load_page(page_index)?;
82 let cached = self.cached_page.as_ref().expect("page loaded");
83 let local_index = (index - cached.header.first_frame_index) as usize;
84 let frame_len = self.header.schema.frame_len as usize;
85 let offset = local_index * frame_len;
86 Ok(Some(OwnedFrame::new(
87 &self.header.schema,
88 cached.raw[offset..offset + frame_len].to_vec(),
89 )?))
90 }
91
92 pub fn first_frame(&mut self) -> Result<Option<OwnedFrame>> {
93 if self.header.page_count == 0 {
94 return Ok(None);
95 }
96 self.read_frame_from_page(0, 0).map(Some)
97 }
98
99 pub fn last_frame(&mut self) -> Result<Option<OwnedFrame>> {
100 if self.header.page_count == 0 {
101 return Ok(None);
102 }
103 let page_index = self.header.page_count - 1;
104 let page = self.read_page_header(page_index)?;
105 let local_index = page
106 .frame_count
107 .checked_sub(1)
108 .ok_or(V2Error::InvalidPageHeader(page_index))? as usize;
109 self.read_frame_from_page(page_index, local_index).map(Some)
110 }
111
112 pub fn read_key_at(&mut self, index: u64) -> Result<Option<Key>> {
113 let Some(page_index) = self.page_for_index_with_cache(index)? else {
114 return Ok(None);
115 };
116 self.load_page(page_index)?;
117 let cached = self.cached_page.as_ref().expect("page loaded");
118 let local_index = index - cached.header.first_frame_index;
119 Ok(Some(self.cached_key(local_index as usize)?))
120 }
121
122 pub fn first_key(&mut self) -> Result<Option<Key>> {
123 if self.header.page_count == 0 {
124 Ok(None)
125 } else {
126 Ok(Some(self.read_page_header(0)?.first_key))
127 }
128 }
129
130 pub fn last_key(&mut self) -> Result<Option<Key>> {
131 if self.header.page_count == 0 {
132 Ok(None)
133 } else {
134 Ok(Some(
135 self.read_page_header(self.header.page_count - 1)?.last_key,
136 ))
137 }
138 }
139
140 pub fn find_page_for_index(&mut self, index: u64) -> Result<Option<u64>> {
141 if index >= self.header.frame_count || self.header.page_count == 0 {
142 return Ok(None);
143 }
144 let mut lo = 0;
145 let mut hi = self.header.page_count;
146 while lo < hi {
147 let mid = lo + ((hi - lo) >> 1);
148 let page = self.read_page_header(mid)?;
149 if page.first_frame_index <= index {
150 lo = mid + 1;
151 } else {
152 hi = mid;
153 }
154 }
155 let page_index = lo.saturating_sub(1);
156 let page = self.read_page_header(page_index)?;
157 if index < page.first_frame_index + u64::from(page.frame_count) {
158 Ok(Some(page_index))
159 } else {
160 Err(V2Error::InvalidPageHeader(page_index))
161 }
162 }
163
164 fn page_for_index_with_cache(&mut self, index: u64) -> Result<Option<u64>> {
165 if index >= self.header.frame_count {
166 return Ok(None);
167 }
168 if let Some(cached) = &self.cached_page {
169 let start = cached.header.first_frame_index;
170 let end = start + u64::from(cached.header.frame_count);
171 if (start..end).contains(&index) {
172 return Ok(Some(cached.index));
173 }
174 if index == end && cached.index + 1 < self.header.page_count {
175 return Ok(Some(cached.index + 1));
176 }
177 }
178 self.find_page_for_index(index)
179 }
180
181 pub fn lower_bound(&mut self, key: Key) -> Result<u64> {
182 let mut lo = 0;
183 let mut hi = self.header.page_count;
184 while lo < hi {
185 let mid = lo + ((hi - lo) >> 1);
186 if self.read_page_header(mid)?.last_key < key {
187 lo = mid + 1;
188 } else {
189 hi = mid;
190 }
191 }
192 if lo == self.header.page_count {
193 return Ok(self.header.frame_count);
194 }
195 self.load_page(lo)?;
196 let cached = self.cached_page.as_ref().expect("page loaded");
197 let mut frame_lo = 0usize;
198 let mut frame_hi = cached.header.frame_count as usize;
199 while frame_lo < frame_hi {
200 let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
201 if self.cached_key(mid)? < key {
202 frame_lo = mid + 1;
203 } else {
204 frame_hi = mid;
205 }
206 }
207 Ok(cached.header.first_frame_index + frame_lo as u64)
208 }
209
210 pub fn upper_bound(&mut self, key: Key) -> Result<u64> {
211 let mut lo = 0;
212 let mut hi = self.header.page_count;
213 while lo < hi {
214 let mid = lo + ((hi - lo) >> 1);
215 if self.read_page_header(mid)?.last_key <= key {
216 lo = mid + 1;
217 } else {
218 hi = mid;
219 }
220 }
221 if lo == self.header.page_count {
222 return Ok(self.header.frame_count);
223 }
224 self.load_page(lo)?;
225 let cached = self.cached_page.as_ref().expect("page loaded");
226 let mut frame_lo = 0usize;
227 let mut frame_hi = cached.header.frame_count as usize;
228 while frame_lo < frame_hi {
229 let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
230 if self.cached_key(mid)? <= key {
231 frame_lo = mid + 1;
232 } else {
233 frame_hi = mid;
234 }
235 }
236 Ok(cached.header.first_frame_index + frame_lo as u64)
237 }
238
239 pub fn equal_range(&mut self, key: Key) -> Result<(u64, u64)> {
240 let Some(lower_page) = self.first_page_with_last_key(key, false)? else {
241 return Ok((self.header.frame_count, self.header.frame_count));
242 };
243 self.load_page(lower_page)?;
244 let lower = self.bound_in_cached_page(key, false)?;
245
246 let cached = self.cached_page.as_ref().expect("page loaded");
247 if cached.header.last_key > key {
248 let upper = self.bound_in_cached_page(key, true)?;
249 return Ok((lower, upper));
250 }
251
252 let Some(upper_page) = self.first_page_with_last_key_from(key, true, lower_page + 1)?
253 else {
254 return Ok((lower, self.header.frame_count));
255 };
256 self.load_page(upper_page)?;
257 let upper = self.bound_in_cached_page(key, true)?;
258 Ok((lower, upper))
259 }
260
261 fn first_page_with_last_key(&mut self, key: Key, strict: bool) -> Result<Option<u64>> {
262 self.first_page_with_last_key_from(key, strict, 0)
263 }
264
265 fn first_page_with_last_key_from(
266 &mut self,
267 key: Key,
268 strict: bool,
269 start: u64,
270 ) -> Result<Option<u64>> {
271 let mut lo = start;
272 let mut hi = self.header.page_count;
273 while lo < hi {
274 let mid = lo + ((hi - lo) >> 1);
275 let last_key = self.read_page_header(mid)?.last_key;
276 if last_key < key || (strict && last_key == key) {
277 lo = mid + 1;
278 } else {
279 hi = mid;
280 }
281 }
282 Ok((lo < self.header.page_count).then_some(lo))
283 }
284
285 fn bound_in_cached_page(&self, key: Key, upper: bool) -> Result<u64> {
286 let cached = self.cached_page.as_ref().expect("page loaded");
287 let mut lo = 0usize;
288 let mut hi = cached.header.frame_count as usize;
289 while lo < hi {
290 let mid = lo + ((hi - lo) >> 1);
291 let mid_key = self.cached_key(mid)?;
292 if mid_key < key || (upper && mid_key == key) {
293 lo = mid + 1;
294 } else {
295 hi = mid;
296 }
297 }
298 Ok(cached.header.first_frame_index + lo as u64)
299 }
300
301 fn load_page(&mut self, page_index: u64) -> Result<()> {
302 if self
303 .cached_page
304 .as_ref()
305 .is_some_and(|cached| cached.index == page_index)
306 {
307 return Ok(());
308 }
309 let page_header = self.read_page_header(page_index)?;
310 let mut compressed = vec![0u8; page_header.compressed_len as usize];
311 self.inner.read_exact(&mut compressed)?;
312 page_header.validate_payload(&compressed)?;
313 let encoded = page_header
314 .codec
315 .decompress(&compressed, page_header.uncompressed_len as usize)?;
316 let raw = decode_page_payload(
317 &self.header.schema,
318 page_header.encoding,
319 &encoded,
320 page_header.frame_count as usize,
321 )?;
322 self.cached_page = Some(CachedPage {
323 index: page_index,
324 header: page_header,
325 raw,
326 });
327 Ok(())
328 }
329
330 fn read_frame_from_page(&mut self, page_index: u64, local_index: usize) -> Result<OwnedFrame> {
331 self.load_page(page_index)?;
332 let cached = self.cached_page.as_ref().expect("page loaded");
333 let frame_len = self.header.schema.frame_len as usize;
334 let offset = local_index * frame_len;
335 OwnedFrame::new(
336 &self.header.schema,
337 cached.raw[offset..offset + frame_len].to_vec(),
338 )
339 .map_err(Into::into)
340 }
341
342 fn cached_key(&self, local_index: usize) -> Result<Key> {
343 let cached = self.cached_page.as_ref().expect("page loaded");
344 let frame_len = self.header.schema.frame_len as usize;
345 let key_field = self.header.schema.key_field();
346 let offset = local_index * frame_len + key_field.offset as usize;
347 let end = offset + key_field.length as usize;
348 Ok(Key::decode(self.key_type, &cached.raw[offset..end])?)
349 }
350
351 pub fn find_page_for_key(&mut self, key: Key) -> Result<Option<u64>> {
352 if self.header.page_count == 0 {
353 return Ok(None);
354 }
355 let mut lo = 0;
356 let mut hi = self.header.page_count;
357 while lo < hi {
358 let mid = lo + ((hi - lo) >> 1);
359 let page = self.read_page_header(mid)?;
360 if page.last_key < key {
361 lo = mid + 1;
362 } else {
363 hi = mid;
364 }
365 }
366 if lo >= self.header.page_count {
367 return Ok(None);
368 }
369 let page = self.read_page_header(lo)?;
370 if key < page.first_key {
371 Ok(None)
372 } else {
373 Ok(Some(lo))
374 }
375 }
376
377 pub fn frames_between(&mut self, first: Key, last: Key) -> Result<Vec<OwnedFrame>> {
378 if first > last {
379 return Ok(Vec::new());
380 }
381 let start = self.lower_bound(first)?;
382 let end = self.upper_bound(last)?;
383 let mut out = Vec::with_capacity((end - start) as usize);
384 for index in start..end {
385 out.push(self.read_frame_at(index)?.expect("index is in range"));
386 }
387 Ok(out)
388 }
389
390 pub fn read_all_frames(&mut self) -> Result<Vec<OwnedFrame>> {
391 let mut out = Vec::with_capacity(self.header.frame_count as usize);
392 for page_index in 0..self.header.page_count {
393 out.extend(self.read_page_frames(page_index)?);
394 }
395 Ok(out)
396 }
397
398 pub fn verify(&mut self) -> Result<()> {
399 let mut last_key = None;
400 let mut count = 0u64;
401 for page_index in 0..self.header.page_count {
402 let page = self.read_page_header(page_index)?;
403 if page.first_frame_index != count {
404 return Err(V2Error::InvalidPageHeader(page_index));
405 }
406 let frames = self.read_page_frames(page_index)?;
407 if frames.len() != page.frame_count as usize {
408 return Err(V2Error::InvalidPageHeader(page_index));
409 }
410 for frame in &frames {
411 let key = frame.as_ref().key(&self.header.schema, self.key_type)?;
412 if let Some(last) = last_key {
413 if key < last {
414 return Err(V2Error::KeyOrderViolation);
415 }
416 }
417 last_key = Some(key);
418 }
419 count += frames.len() as u64;
420 }
421 if count != self.header.frame_count {
422 return Err(V2Error::InvalidFileHeader);
423 }
424 Ok(())
425 }
426}