Skip to main content

fwob_v2/
reader.rs

1use std::{
2    fs::File,
3    io::{Read, Seek, SeekFrom},
4    path::Path,
5};
6
7use fwob_core::{Key, KeyType, OwnedFrame};
8
9use crate::{
10    encoding::decode_page_payload,
11    file_header::{read_file_header, FileHeader},
12    page::PageHeader,
13    Result, V2Error,
14};
15
16pub struct Reader<R> {
17    inner: R,
18    header: FileHeader,
19    key_type: KeyType,
20    cached_page: Option<CachedPage>,
21}
22
23struct CachedPage {
24    index: u64,
25    header: PageHeader,
26    raw: Vec<u8>,
27}
28
29impl Reader<File> {
30    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
31        let file = File::open(path)?;
32        Self::new(file)
33    }
34}
35
36impl<R: Read + Seek> Reader<R> {
37    pub fn new(mut inner: R) -> Result<Self> {
38        let header = read_file_header(&mut inner)?;
39        let key_type = KeyType::from_field(header.schema.key_field())?;
40        Ok(Self {
41            inner,
42            header,
43            key_type,
44            cached_page: None,
45        })
46    }
47
48    pub fn header(&self) -> &FileHeader {
49        &self.header
50    }
51
52    pub fn read_page_header(&mut self, page_index: u64) -> Result<PageHeader> {
53        if page_index >= self.header.page_count {
54            return Err(V2Error::InvalidPageHeader(page_index));
55        }
56        self.inner
57            .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
58        PageHeader::read(&mut self.inner, page_index)
59    }
60
61    pub fn read_page_frames(&mut self, page_index: u64) -> Result<Vec<OwnedFrame>> {
62        self.load_page(page_index)?;
63        let raw = &self.cached_page.as_ref().expect("page loaded").raw;
64        let frame_len = self.header.schema.frame_len as usize;
65        let mut frames = Vec::with_capacity(raw.len() / frame_len);
66        for chunk in raw.chunks_exact(frame_len) {
67            frames.push(OwnedFrame::new(&self.header.schema, chunk.to_vec())?);
68        }
69        Ok(frames)
70    }
71
72    pub fn read_page_raw_frames(&mut self, page_index: u64) -> Result<Vec<u8>> {
73        self.load_page(page_index)?;
74        Ok(self.cached_page.as_ref().expect("page loaded").raw.clone())
75    }
76
77    pub fn read_frame_at(&mut self, index: u64) -> Result<Option<OwnedFrame>> {
78        let Some(page_index) = self.page_for_index_with_cache(index)? else {
79            return Ok(None);
80        };
81        self.load_page(page_index)?;
82        let cached = self.cached_page.as_ref().expect("page loaded");
83        let local_index = (index - cached.header.first_frame_index) as usize;
84        let frame_len = self.header.schema.frame_len as usize;
85        let offset = local_index * frame_len;
86        Ok(Some(OwnedFrame::new(
87            &self.header.schema,
88            cached.raw[offset..offset + frame_len].to_vec(),
89        )?))
90    }
91
92    pub fn first_frame(&mut self) -> Result<Option<OwnedFrame>> {
93        if self.header.page_count == 0 {
94            return Ok(None);
95        }
96        self.read_frame_from_page(0, 0).map(Some)
97    }
98
99    pub fn last_frame(&mut self) -> Result<Option<OwnedFrame>> {
100        if self.header.page_count == 0 {
101            return Ok(None);
102        }
103        let page_index = self.header.page_count - 1;
104        let page = self.read_page_header(page_index)?;
105        let local_index = page
106            .frame_count
107            .checked_sub(1)
108            .ok_or(V2Error::InvalidPageHeader(page_index))? as usize;
109        self.read_frame_from_page(page_index, local_index).map(Some)
110    }
111
112    pub fn read_key_at(&mut self, index: u64) -> Result<Option<Key>> {
113        let Some(page_index) = self.page_for_index_with_cache(index)? else {
114            return Ok(None);
115        };
116        self.load_page(page_index)?;
117        let cached = self.cached_page.as_ref().expect("page loaded");
118        let local_index = index - cached.header.first_frame_index;
119        Ok(Some(self.cached_key(local_index as usize)?))
120    }
121
122    pub fn first_key(&mut self) -> Result<Option<Key>> {
123        if self.header.page_count == 0 {
124            Ok(None)
125        } else {
126            Ok(Some(self.read_page_header(0)?.first_key))
127        }
128    }
129
130    pub fn last_key(&mut self) -> Result<Option<Key>> {
131        if self.header.page_count == 0 {
132            Ok(None)
133        } else {
134            Ok(Some(
135                self.read_page_header(self.header.page_count - 1)?.last_key,
136            ))
137        }
138    }
139
140    pub fn find_page_for_index(&mut self, index: u64) -> Result<Option<u64>> {
141        if index >= self.header.frame_count || self.header.page_count == 0 {
142            return Ok(None);
143        }
144        let mut lo = 0;
145        let mut hi = self.header.page_count;
146        while lo < hi {
147            let mid = lo + ((hi - lo) >> 1);
148            let page = self.read_page_header(mid)?;
149            if page.first_frame_index <= index {
150                lo = mid + 1;
151            } else {
152                hi = mid;
153            }
154        }
155        let page_index = lo.saturating_sub(1);
156        let page = self.read_page_header(page_index)?;
157        if index < page.first_frame_index + u64::from(page.frame_count) {
158            Ok(Some(page_index))
159        } else {
160            Err(V2Error::InvalidPageHeader(page_index))
161        }
162    }
163
164    fn page_for_index_with_cache(&mut self, index: u64) -> Result<Option<u64>> {
165        if index >= self.header.frame_count {
166            return Ok(None);
167        }
168        if let Some(cached) = &self.cached_page {
169            let start = cached.header.first_frame_index;
170            let end = start + u64::from(cached.header.frame_count);
171            if (start..end).contains(&index) {
172                return Ok(Some(cached.index));
173            }
174            if index == end && cached.index + 1 < self.header.page_count {
175                return Ok(Some(cached.index + 1));
176            }
177        }
178        self.find_page_for_index(index)
179    }
180
181    pub fn lower_bound(&mut self, key: Key) -> Result<u64> {
182        let mut lo = 0;
183        let mut hi = self.header.page_count;
184        while lo < hi {
185            let mid = lo + ((hi - lo) >> 1);
186            if self.read_page_header(mid)?.last_key < key {
187                lo = mid + 1;
188            } else {
189                hi = mid;
190            }
191        }
192        if lo == self.header.page_count {
193            return Ok(self.header.frame_count);
194        }
195        self.load_page(lo)?;
196        let cached = self.cached_page.as_ref().expect("page loaded");
197        let mut frame_lo = 0usize;
198        let mut frame_hi = cached.header.frame_count as usize;
199        while frame_lo < frame_hi {
200            let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
201            if self.cached_key(mid)? < key {
202                frame_lo = mid + 1;
203            } else {
204                frame_hi = mid;
205            }
206        }
207        Ok(cached.header.first_frame_index + frame_lo as u64)
208    }
209
210    pub fn upper_bound(&mut self, key: Key) -> Result<u64> {
211        let mut lo = 0;
212        let mut hi = self.header.page_count;
213        while lo < hi {
214            let mid = lo + ((hi - lo) >> 1);
215            if self.read_page_header(mid)?.last_key <= key {
216                lo = mid + 1;
217            } else {
218                hi = mid;
219            }
220        }
221        if lo == self.header.page_count {
222            return Ok(self.header.frame_count);
223        }
224        self.load_page(lo)?;
225        let cached = self.cached_page.as_ref().expect("page loaded");
226        let mut frame_lo = 0usize;
227        let mut frame_hi = cached.header.frame_count as usize;
228        while frame_lo < frame_hi {
229            let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
230            if self.cached_key(mid)? <= key {
231                frame_lo = mid + 1;
232            } else {
233                frame_hi = mid;
234            }
235        }
236        Ok(cached.header.first_frame_index + frame_lo as u64)
237    }
238
239    pub fn equal_range(&mut self, key: Key) -> Result<(u64, u64)> {
240        let Some(lower_page) = self.first_page_with_last_key(key, false)? else {
241            return Ok((self.header.frame_count, self.header.frame_count));
242        };
243        self.load_page(lower_page)?;
244        let lower = self.bound_in_cached_page(key, false)?;
245
246        let cached = self.cached_page.as_ref().expect("page loaded");
247        if cached.header.last_key > key {
248            let upper = self.bound_in_cached_page(key, true)?;
249            return Ok((lower, upper));
250        }
251
252        let Some(upper_page) = self.first_page_with_last_key_from(key, true, lower_page + 1)?
253        else {
254            return Ok((lower, self.header.frame_count));
255        };
256        self.load_page(upper_page)?;
257        let upper = self.bound_in_cached_page(key, true)?;
258        Ok((lower, upper))
259    }
260
261    fn first_page_with_last_key(&mut self, key: Key, strict: bool) -> Result<Option<u64>> {
262        self.first_page_with_last_key_from(key, strict, 0)
263    }
264
265    fn first_page_with_last_key_from(
266        &mut self,
267        key: Key,
268        strict: bool,
269        start: u64,
270    ) -> Result<Option<u64>> {
271        let mut lo = start;
272        let mut hi = self.header.page_count;
273        while lo < hi {
274            let mid = lo + ((hi - lo) >> 1);
275            let last_key = self.read_page_header(mid)?.last_key;
276            if last_key < key || (strict && last_key == key) {
277                lo = mid + 1;
278            } else {
279                hi = mid;
280            }
281        }
282        Ok((lo < self.header.page_count).then_some(lo))
283    }
284
285    fn bound_in_cached_page(&self, key: Key, upper: bool) -> Result<u64> {
286        let cached = self.cached_page.as_ref().expect("page loaded");
287        let mut lo = 0usize;
288        let mut hi = cached.header.frame_count as usize;
289        while lo < hi {
290            let mid = lo + ((hi - lo) >> 1);
291            let mid_key = self.cached_key(mid)?;
292            if mid_key < key || (upper && mid_key == key) {
293                lo = mid + 1;
294            } else {
295                hi = mid;
296            }
297        }
298        Ok(cached.header.first_frame_index + lo as u64)
299    }
300
301    fn load_page(&mut self, page_index: u64) -> Result<()> {
302        if self
303            .cached_page
304            .as_ref()
305            .is_some_and(|cached| cached.index == page_index)
306        {
307            return Ok(());
308        }
309        let page_header = self.read_page_header(page_index)?;
310        let mut compressed = vec![0u8; page_header.compressed_len as usize];
311        self.inner.read_exact(&mut compressed)?;
312        page_header.validate_payload(&compressed)?;
313        let encoded = page_header
314            .codec
315            .decompress(&compressed, page_header.uncompressed_len as usize)?;
316        let raw = decode_page_payload(
317            &self.header.schema,
318            page_header.encoding,
319            &encoded,
320            page_header.frame_count as usize,
321        )?;
322        self.cached_page = Some(CachedPage {
323            index: page_index,
324            header: page_header,
325            raw,
326        });
327        Ok(())
328    }
329
330    fn read_frame_from_page(&mut self, page_index: u64, local_index: usize) -> Result<OwnedFrame> {
331        self.load_page(page_index)?;
332        let cached = self.cached_page.as_ref().expect("page loaded");
333        let frame_len = self.header.schema.frame_len as usize;
334        let offset = local_index * frame_len;
335        OwnedFrame::new(
336            &self.header.schema,
337            cached.raw[offset..offset + frame_len].to_vec(),
338        )
339        .map_err(Into::into)
340    }
341
342    fn cached_key(&self, local_index: usize) -> Result<Key> {
343        let cached = self.cached_page.as_ref().expect("page loaded");
344        let frame_len = self.header.schema.frame_len as usize;
345        let key_field = self.header.schema.key_field();
346        let offset = local_index * frame_len + key_field.offset as usize;
347        let end = offset + key_field.length as usize;
348        Ok(Key::decode(self.key_type, &cached.raw[offset..end])?)
349    }
350
351    pub fn find_page_for_key(&mut self, key: Key) -> Result<Option<u64>> {
352        if self.header.page_count == 0 {
353            return Ok(None);
354        }
355        let mut lo = 0;
356        let mut hi = self.header.page_count;
357        while lo < hi {
358            let mid = lo + ((hi - lo) >> 1);
359            let page = self.read_page_header(mid)?;
360            if page.last_key < key {
361                lo = mid + 1;
362            } else {
363                hi = mid;
364            }
365        }
366        if lo >= self.header.page_count {
367            return Ok(None);
368        }
369        let page = self.read_page_header(lo)?;
370        if key < page.first_key {
371            Ok(None)
372        } else {
373            Ok(Some(lo))
374        }
375    }
376
377    pub fn frames_between(&mut self, first: Key, last: Key) -> Result<Vec<OwnedFrame>> {
378        if first > last {
379            return Ok(Vec::new());
380        }
381        let start = self.lower_bound(first)?;
382        let end = self.upper_bound(last)?;
383        let mut out = Vec::with_capacity((end - start) as usize);
384        for index in start..end {
385            out.push(self.read_frame_at(index)?.expect("index is in range"));
386        }
387        Ok(out)
388    }
389
390    pub fn read_all_frames(&mut self) -> Result<Vec<OwnedFrame>> {
391        let mut out = Vec::with_capacity(self.header.frame_count as usize);
392        for page_index in 0..self.header.page_count {
393            out.extend(self.read_page_frames(page_index)?);
394        }
395        Ok(out)
396    }
397
398    pub fn verify(&mut self) -> Result<()> {
399        let mut last_key = None;
400        let mut count = 0u64;
401        for page_index in 0..self.header.page_count {
402            let page = self.read_page_header(page_index)?;
403            if page.first_frame_index != count {
404                return Err(V2Error::InvalidPageHeader(page_index));
405            }
406            let frames = self.read_page_frames(page_index)?;
407            if frames.len() != page.frame_count as usize {
408                return Err(V2Error::InvalidPageHeader(page_index));
409            }
410            for frame in &frames {
411                let key = frame.as_ref().key(&self.header.schema, self.key_type)?;
412                if let Some(last) = last_key {
413                    if key < last {
414                        return Err(V2Error::KeyOrderViolation);
415                    }
416                }
417                last_key = Some(key);
418            }
419            count += frames.len() as u64;
420        }
421        if count != self.header.frame_count {
422            return Err(V2Error::InvalidFileHeader);
423        }
424        Ok(())
425    }
426}