Skip to main content

rvf_runtime/
read_path.rs

1//! Progressive read logic for the RVF runtime.
2//!
3//! Boot sequence:
4//! 1. Seek to EOF - 4096, parse Level 0 root manifest
5//! 2. Extract hotset pointers, mmap hot segments
6//! 3. Background: parse Level 1 -> full segment directory
7//! 4. On-demand: load cold segments as queries need them
8
9use rvf_types::{FileIdentity, SegmentHeader, SegmentType, SEGMENT_HEADER_SIZE, SEGMENT_MAGIC};
10use std::collections::HashMap;
11use std::io::{self, Read, Seek, SeekFrom};
12
13/// A parsed segment directory entry.
14#[derive(Clone, Debug)]
15pub(crate) struct SegDirEntry {
16    pub seg_id: u64,
17    pub offset: u64,
18    pub payload_length: u64,
19    pub seg_type: u8,
20}
21
22/// Parsed manifest data from the file.
23#[derive(Clone, Debug)]
24#[allow(dead_code)]
25pub(crate) struct ParsedManifest {
26    pub epoch: u32,
27    pub dimension: u16,
28    pub total_vectors: u64,
29    pub profile_id: u8,
30    pub segment_dir: Vec<SegDirEntry>,
31    pub deleted_ids: Vec<u64>,
32    pub file_identity: Option<FileIdentity>,
33}
34
35/// In-memory vector storage loaded from VEC_SEGs.
36#[allow(dead_code)]
37pub(crate) struct VectorData {
38    /// Maps vector_id -> (dimension-sized f32 slice stored as Vec<f32>).
39    pub vectors: HashMap<u64, Vec<f32>>,
40    pub dimension: u16,
41}
42
43impl VectorData {
44    pub(crate) fn new(dimension: u16) -> Self {
45        Self {
46            vectors: HashMap::new(),
47            dimension,
48        }
49    }
50
51    pub(crate) fn get(&self, id: u64) -> Option<&[f32]> {
52        self.vectors.get(&id).map(|v| v.as_slice())
53    }
54
55    pub(crate) fn insert(&mut self, id: u64, data: Vec<f32>) {
56        self.vectors.insert(id, data);
57    }
58
59    pub(crate) fn remove(&mut self, id: u64) {
60        self.vectors.remove(&id);
61    }
62
63    pub(crate) fn len(&self) -> usize {
64        self.vectors.len()
65    }
66
67    pub(crate) fn ids(&self) -> impl Iterator<Item = &u64> {
68        self.vectors.keys()
69    }
70}
71
72/// Scan backwards from EOF to find and parse the latest valid manifest.
73///
74/// Reads a tail chunk and scans byte-by-byte for the magic + manifest-type
75/// pattern, since segment headers are NOT necessarily 64-byte aligned from EOF.
76pub(crate) fn find_latest_manifest<R: Read + Seek>(reader: &mut R) -> io::Result<Option<ParsedManifest>> {
77    let file_size = reader.seek(SeekFrom::End(0))?;
78    if file_size < SEGMENT_HEADER_SIZE as u64 {
79        return Ok(None);
80    }
81
82    // Read up to 64 KB from the tail of the file into memory for scanning.
83    // The manifest is typically ~4 KB, so 64 KB gives 16x headroom.
84    let scan_size = std::cmp::min(file_size, 65_536) as usize;
85    let scan_start = file_size - scan_size as u64;
86    reader.seek(SeekFrom::Start(scan_start))?;
87    let mut buf = vec![0u8; scan_size];
88    reader.read_exact(&mut buf)?;
89
90    let magic_bytes = SEGMENT_MAGIC.to_le_bytes();
91    let manifest_type = SegmentType::Manifest as u8;
92
93    // Scan backwards through the buffer looking for magic + manifest type.
94    // We need at least SEGMENT_HEADER_SIZE bytes from the candidate position.
95    if buf.len() < SEGMENT_HEADER_SIZE {
96        return Ok(None);
97    }
98
99    let last_possible = buf.len() - SEGMENT_HEADER_SIZE;
100    for i in (0..=last_possible).rev() {
101        if buf[i..i + 4] == magic_bytes && buf[i + 5] == manifest_type {
102            // Found a candidate manifest header at offset `i` within the buffer.
103            let hdr_buf = &buf[i..i + SEGMENT_HEADER_SIZE];
104            let payload_length_u64 = u64::from_le_bytes([
105                hdr_buf[0x10], hdr_buf[0x11], hdr_buf[0x12], hdr_buf[0x13],
106                hdr_buf[0x14], hdr_buf[0x15], hdr_buf[0x16], hdr_buf[0x17],
107            ]);
108
109            // Reject implausible payload lengths to prevent OOM.
110            if payload_length_u64 > MAX_READ_PAYLOAD {
111                continue;
112            }
113            let payload_length = payload_length_u64 as usize;
114
115            let payload_start = i + SEGMENT_HEADER_SIZE;
116            let payload_end = match payload_start.checked_add(payload_length) {
117                Some(end) => end,
118                None => continue, // overflow: skip this candidate
119            };
120
121            if payload_end <= buf.len() {
122                // Payload is within our buffer — parse directly.
123                if let Some(manifest) = parse_manifest_payload(&buf[payload_start..payload_end]) {
124                    return Ok(Some(manifest));
125                }
126            } else {
127                // Payload extends beyond our buffer — read from file.
128                let file_offset = scan_start + i as u64 + SEGMENT_HEADER_SIZE as u64;
129                reader.seek(SeekFrom::Start(file_offset))?;
130                let mut payload = vec![0u8; payload_length];
131                if reader.read_exact(&mut payload).is_ok() {
132                    if let Some(manifest) = parse_manifest_payload(&payload) {
133                        return Ok(Some(manifest));
134                    }
135                }
136            }
137        }
138    }
139
140    Ok(None)
141}
142
143/// Parse a manifest payload into structured data.
144fn parse_manifest_payload(payload: &[u8]) -> Option<ParsedManifest> {
145    // Minimum header: epoch(4) + dim(2) + total_vectors(8) + seg_count(4) + profile(1) + pad(3) = 22
146    if payload.len() < 22 {
147        return None;
148    }
149
150    let epoch = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
151    let dimension = u16::from_le_bytes([payload[4], payload[5]]);
152    let total_vectors = u64::from_le_bytes([
153        payload[6], payload[7], payload[8], payload[9],
154        payload[10], payload[11], payload[12], payload[13],
155    ]);
156    let seg_count = u32::from_le_bytes([payload[14], payload[15], payload[16], payload[17]]);
157    let profile_id = payload[18];
158
159    let mut offset = 22; // past header (4+2+8+4+1+3)
160
161    // Validate that seg_count does not exceed what the payload can actually hold.
162    // Each directory entry is 25 bytes, so seg_count * 25 + 22 must fit in the payload.
163    let max_possible_entries = payload.len().saturating_sub(22) / 25;
164    if (seg_count as usize) > max_possible_entries {
165        return None;
166    }
167
168    // Parse segment directory.
169    let mut segment_dir = Vec::with_capacity(seg_count as usize);
170    for _ in 0..seg_count {
171        if offset + 25 > payload.len() {
172            return None;
173        }
174        let seg_id = u64::from_le_bytes([
175            payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
176            payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
177        ]);
178        let seg_offset = u64::from_le_bytes([
179            payload[offset + 8], payload[offset + 9], payload[offset + 10], payload[offset + 11],
180            payload[offset + 12], payload[offset + 13], payload[offset + 14], payload[offset + 15],
181        ]);
182        let plen = u64::from_le_bytes([
183            payload[offset + 16], payload[offset + 17], payload[offset + 18], payload[offset + 19],
184            payload[offset + 20], payload[offset + 21], payload[offset + 22], payload[offset + 23],
185        ]);
186        let stype = payload[offset + 24];
187        segment_dir.push(SegDirEntry {
188            seg_id,
189            offset: seg_offset,
190            payload_length: plen,
191            seg_type: stype,
192        });
193        offset += 25;
194    }
195
196    // Parse deletion bitmap.
197    let mut deleted_ids = Vec::new();
198    if offset + 4 <= payload.len() {
199        let del_count = u32::from_le_bytes([
200            payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
201        ]);
202        offset += 4;
203        for _ in 0..del_count {
204            if offset + 8 > payload.len() {
205                break;
206            }
207            let did = u64::from_le_bytes([
208                payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
209                payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
210            ]);
211            deleted_ids.push(did);
212            offset += 8;
213        }
214    }
215
216    // Try to parse FileIdentity trailer (backward-compatible).
217    // Look for magic marker 0x46494449 ("FIDI") followed by 68 bytes.
218    let file_identity = if offset + 4 + 68 <= payload.len() {
219        let marker = u32::from_le_bytes([
220            payload[offset], payload[offset + 1],
221            payload[offset + 2], payload[offset + 3],
222        ]);
223        if marker == 0x4649_4449 {
224            offset += 4;
225            let fi_data: &[u8; 68] = payload[offset..offset + 68].try_into().ok()?;
226            Some(FileIdentity::from_bytes(fi_data))
227        } else {
228            None
229        }
230    } else {
231        None
232    };
233
234    Some(ParsedManifest {
235        epoch,
236        dimension,
237        total_vectors,
238        profile_id,
239        segment_dir,
240        deleted_ids,
241        file_identity,
242    })
243}
244
245/// Read a VEC_SEG payload and return (id, vector) pairs.
246pub(crate) fn read_vec_seg_payload(payload: &[u8]) -> Option<Vec<(u64, Vec<f32>)>> {
247    if payload.len() < 6 {
248        return None;
249    }
250
251    let dimension = u16::from_le_bytes([payload[0], payload[1]]) as usize;
252    let vector_count = u32::from_le_bytes([payload[2], payload[3], payload[4], payload[5]]) as usize;
253
254    let bytes_per_vec = dimension * 4;
255    let expected_size = 6 + vector_count * (8 + bytes_per_vec);
256    if payload.len() < expected_size {
257        return None;
258    }
259
260    let mut result = Vec::with_capacity(vector_count);
261    let mut offset = 6;
262
263    for _ in 0..vector_count {
264        let vec_id = u64::from_le_bytes([
265            payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
266            payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
267        ]);
268        offset += 8;
269
270        let mut vec_data = Vec::with_capacity(dimension);
271        for _ in 0..dimension {
272            let val = f32::from_le_bytes([
273                payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
274            ]);
275            vec_data.push(val);
276            offset += 4;
277        }
278
279        result.push((vec_id, vec_data));
280    }
281
282    Some(result)
283}
284
285/// Maximum allowed payload size when reading segments (256 MiB).
286/// This prevents a malicious payload_length field from causing OOM.
287const MAX_READ_PAYLOAD: u64 = 256 * 1024 * 1024;
288
289/// Read a segment's payload from the file given its offset.
290///
291/// Validates magic, enforces a maximum payload size, and verifies the
292/// content hash before returning the data.
293pub(crate) fn read_segment_payload<R: Read + Seek>(
294    reader: &mut R,
295    seg_offset: u64,
296) -> io::Result<(SegmentHeader, Vec<u8>)> {
297    reader.seek(SeekFrom::Start(seg_offset))?;
298
299    let mut hdr_buf = [0u8; SEGMENT_HEADER_SIZE];
300    reader.read_exact(&mut hdr_buf)?;
301
302    let magic = u32::from_le_bytes([hdr_buf[0], hdr_buf[1], hdr_buf[2], hdr_buf[3]]);
303    if magic != SEGMENT_MAGIC {
304        return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid segment magic"));
305    }
306
307    let payload_length = u64::from_le_bytes([
308        hdr_buf[0x10], hdr_buf[0x11], hdr_buf[0x12], hdr_buf[0x13],
309        hdr_buf[0x14], hdr_buf[0x15], hdr_buf[0x16], hdr_buf[0x17],
310    ]);
311
312    // Enforce maximum payload size to prevent OOM from crafted files.
313    if payload_length > MAX_READ_PAYLOAD {
314        return Err(io::Error::new(
315            io::ErrorKind::InvalidData,
316            format!("segment payload too large: {} bytes (max {})", payload_length, MAX_READ_PAYLOAD),
317        ));
318    }
319
320    let header = SegmentHeader {
321        magic,
322        version: hdr_buf[0x04],
323        seg_type: hdr_buf[0x05],
324        flags: u16::from_le_bytes([hdr_buf[0x06], hdr_buf[0x07]]),
325        segment_id: u64::from_le_bytes([
326            hdr_buf[0x08], hdr_buf[0x09], hdr_buf[0x0A], hdr_buf[0x0B],
327            hdr_buf[0x0C], hdr_buf[0x0D], hdr_buf[0x0E], hdr_buf[0x0F],
328        ]),
329        payload_length,
330        timestamp_ns: u64::from_le_bytes([
331            hdr_buf[0x18], hdr_buf[0x19], hdr_buf[0x1A], hdr_buf[0x1B],
332            hdr_buf[0x1C], hdr_buf[0x1D], hdr_buf[0x1E], hdr_buf[0x1F],
333        ]),
334        checksum_algo: hdr_buf[0x20],
335        compression: hdr_buf[0x21],
336        reserved_0: u16::from_le_bytes([hdr_buf[0x22], hdr_buf[0x23]]),
337        reserved_1: u32::from_le_bytes([hdr_buf[0x24], hdr_buf[0x25], hdr_buf[0x26], hdr_buf[0x27]]),
338        content_hash: {
339            let mut h = [0u8; 16];
340            h.copy_from_slice(&hdr_buf[0x28..0x38]);
341            h
342        },
343        uncompressed_len: u32::from_le_bytes([hdr_buf[0x38], hdr_buf[0x39], hdr_buf[0x3A], hdr_buf[0x3B]]),
344        alignment_pad: u32::from_le_bytes([hdr_buf[0x3C], hdr_buf[0x3D], hdr_buf[0x3E], hdr_buf[0x3F]]),
345    };
346
347    // payload_length is guaranteed <= MAX_READ_PAYLOAD (256 MiB) which fits in usize.
348    let mut payload = vec![0u8; payload_length as usize];
349    reader.read_exact(&mut payload)?;
350
351    // Verify content hash if it is non-zero (zero hash means "not set").
352    if header.content_hash != [0u8; 16] {
353        let computed = compute_content_hash(&payload);
354        if computed != header.content_hash {
355            return Err(io::Error::new(
356                io::ErrorKind::InvalidData,
357                "segment content hash mismatch",
358            ));
359        }
360    }
361
362    Ok((header, payload))
363}
364
365/// Compute a 16-byte content hash matching the write path's algorithm.
366/// Uses CRC32 with rotations to fill 16 bytes.
367fn compute_content_hash(data: &[u8]) -> [u8; 16] {
368    let mut hash = [0u8; 16];
369    let crc = crc32_for_verify(data);
370    for i in 0..4 {
371        let rotated = crc.rotate_left(i as u32 * 8);
372        hash[i * 4..(i + 1) * 4].copy_from_slice(&rotated.to_le_bytes());
373    }
374    hash
375}
376
377/// Simple CRC32 computation (matches write_path::crc32_slice).
378fn crc32_for_verify(data: &[u8]) -> u32 {
379    let mut crc: u32 = 0xFFFF_FFFF;
380    for &byte in data {
381        crc ^= byte as u32;
382        for _ in 0..8 {
383            if crc & 1 != 0 {
384                crc = (crc >> 1) ^ 0xEDB8_8320;
385            } else {
386                crc >>= 1;
387            }
388        }
389    }
390    !crc
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn parse_empty_manifest() {
399        assert!(parse_manifest_payload(&[]).is_none());
400        assert!(parse_manifest_payload(&[0u8; 10]).is_none());
401    }
402
403    #[test]
404    fn vec_seg_round_trip() {
405        // Build a VEC_SEG payload: dim=2, count=2, vectors.
406        let dim: u16 = 2;
407        let count: u32 = 2;
408        let mut payload = Vec::new();
409        payload.extend_from_slice(&dim.to_le_bytes());
410        payload.extend_from_slice(&count.to_le_bytes());
411        // Vector 0: id=10, [1.0, 2.0]
412        payload.extend_from_slice(&10u64.to_le_bytes());
413        payload.extend_from_slice(&1.0f32.to_le_bytes());
414        payload.extend_from_slice(&2.0f32.to_le_bytes());
415        // Vector 1: id=20, [3.0, 4.0]
416        payload.extend_from_slice(&20u64.to_le_bytes());
417        payload.extend_from_slice(&3.0f32.to_le_bytes());
418        payload.extend_from_slice(&4.0f32.to_le_bytes());
419
420        let result = read_vec_seg_payload(&payload).unwrap();
421        assert_eq!(result.len(), 2);
422        assert_eq!(result[0].0, 10);
423        assert_eq!(result[0].1, vec![1.0, 2.0]);
424        assert_eq!(result[1].0, 20);
425        assert_eq!(result[1].1, vec![3.0, 4.0]);
426    }
427}