1use rvf_types::{FileIdentity, SegmentHeader, SegmentType, SEGMENT_HEADER_SIZE, SEGMENT_MAGIC};
10use std::collections::HashMap;
11use std::io::{self, Read, Seek, SeekFrom};
12
13#[derive(Clone, Debug)]
15pub(crate) struct SegDirEntry {
16 pub seg_id: u64,
17 pub offset: u64,
18 pub payload_length: u64,
19 pub seg_type: u8,
20}
21
22#[derive(Clone, Debug)]
24#[allow(dead_code)]
25pub(crate) struct ParsedManifest {
26 pub epoch: u32,
27 pub dimension: u16,
28 pub total_vectors: u64,
29 pub profile_id: u8,
30 pub segment_dir: Vec<SegDirEntry>,
31 pub deleted_ids: Vec<u64>,
32 pub file_identity: Option<FileIdentity>,
33}
34
35#[allow(dead_code)]
37pub(crate) struct VectorData {
38 pub vectors: HashMap<u64, Vec<f32>>,
40 pub dimension: u16,
41}
42
43impl VectorData {
44 pub(crate) fn new(dimension: u16) -> Self {
45 Self {
46 vectors: HashMap::new(),
47 dimension,
48 }
49 }
50
51 pub(crate) fn get(&self, id: u64) -> Option<&[f32]> {
52 self.vectors.get(&id).map(|v| v.as_slice())
53 }
54
55 pub(crate) fn insert(&mut self, id: u64, data: Vec<f32>) {
56 self.vectors.insert(id, data);
57 }
58
59 pub(crate) fn remove(&mut self, id: u64) {
60 self.vectors.remove(&id);
61 }
62
63 pub(crate) fn len(&self) -> usize {
64 self.vectors.len()
65 }
66
67 pub(crate) fn ids(&self) -> impl Iterator<Item = &u64> {
68 self.vectors.keys()
69 }
70}
71
72pub(crate) fn find_latest_manifest<R: Read + Seek>(reader: &mut R) -> io::Result<Option<ParsedManifest>> {
77 let file_size = reader.seek(SeekFrom::End(0))?;
78 if file_size < SEGMENT_HEADER_SIZE as u64 {
79 return Ok(None);
80 }
81
82 let scan_size = std::cmp::min(file_size, 65_536) as usize;
85 let scan_start = file_size - scan_size as u64;
86 reader.seek(SeekFrom::Start(scan_start))?;
87 let mut buf = vec![0u8; scan_size];
88 reader.read_exact(&mut buf)?;
89
90 let magic_bytes = SEGMENT_MAGIC.to_le_bytes();
91 let manifest_type = SegmentType::Manifest as u8;
92
93 if buf.len() < SEGMENT_HEADER_SIZE {
96 return Ok(None);
97 }
98
99 let last_possible = buf.len() - SEGMENT_HEADER_SIZE;
100 for i in (0..=last_possible).rev() {
101 if buf[i..i + 4] == magic_bytes && buf[i + 5] == manifest_type {
102 let hdr_buf = &buf[i..i + SEGMENT_HEADER_SIZE];
104 let payload_length_u64 = u64::from_le_bytes([
105 hdr_buf[0x10], hdr_buf[0x11], hdr_buf[0x12], hdr_buf[0x13],
106 hdr_buf[0x14], hdr_buf[0x15], hdr_buf[0x16], hdr_buf[0x17],
107 ]);
108
109 if payload_length_u64 > MAX_READ_PAYLOAD {
111 continue;
112 }
113 let payload_length = payload_length_u64 as usize;
114
115 let payload_start = i + SEGMENT_HEADER_SIZE;
116 let payload_end = match payload_start.checked_add(payload_length) {
117 Some(end) => end,
118 None => continue, };
120
121 if payload_end <= buf.len() {
122 if let Some(manifest) = parse_manifest_payload(&buf[payload_start..payload_end]) {
124 return Ok(Some(manifest));
125 }
126 } else {
127 let file_offset = scan_start + i as u64 + SEGMENT_HEADER_SIZE as u64;
129 reader.seek(SeekFrom::Start(file_offset))?;
130 let mut payload = vec![0u8; payload_length];
131 if reader.read_exact(&mut payload).is_ok() {
132 if let Some(manifest) = parse_manifest_payload(&payload) {
133 return Ok(Some(manifest));
134 }
135 }
136 }
137 }
138 }
139
140 Ok(None)
141}
142
143fn parse_manifest_payload(payload: &[u8]) -> Option<ParsedManifest> {
145 if payload.len() < 22 {
147 return None;
148 }
149
150 let epoch = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
151 let dimension = u16::from_le_bytes([payload[4], payload[5]]);
152 let total_vectors = u64::from_le_bytes([
153 payload[6], payload[7], payload[8], payload[9],
154 payload[10], payload[11], payload[12], payload[13],
155 ]);
156 let seg_count = u32::from_le_bytes([payload[14], payload[15], payload[16], payload[17]]);
157 let profile_id = payload[18];
158
159 let mut offset = 22; let max_possible_entries = payload.len().saturating_sub(22) / 25;
164 if (seg_count as usize) > max_possible_entries {
165 return None;
166 }
167
168 let mut segment_dir = Vec::with_capacity(seg_count as usize);
170 for _ in 0..seg_count {
171 if offset + 25 > payload.len() {
172 return None;
173 }
174 let seg_id = u64::from_le_bytes([
175 payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
176 payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
177 ]);
178 let seg_offset = u64::from_le_bytes([
179 payload[offset + 8], payload[offset + 9], payload[offset + 10], payload[offset + 11],
180 payload[offset + 12], payload[offset + 13], payload[offset + 14], payload[offset + 15],
181 ]);
182 let plen = u64::from_le_bytes([
183 payload[offset + 16], payload[offset + 17], payload[offset + 18], payload[offset + 19],
184 payload[offset + 20], payload[offset + 21], payload[offset + 22], payload[offset + 23],
185 ]);
186 let stype = payload[offset + 24];
187 segment_dir.push(SegDirEntry {
188 seg_id,
189 offset: seg_offset,
190 payload_length: plen,
191 seg_type: stype,
192 });
193 offset += 25;
194 }
195
196 let mut deleted_ids = Vec::new();
198 if offset + 4 <= payload.len() {
199 let del_count = u32::from_le_bytes([
200 payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
201 ]);
202 offset += 4;
203 for _ in 0..del_count {
204 if offset + 8 > payload.len() {
205 break;
206 }
207 let did = u64::from_le_bytes([
208 payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
209 payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
210 ]);
211 deleted_ids.push(did);
212 offset += 8;
213 }
214 }
215
216 let file_identity = if offset + 4 + 68 <= payload.len() {
219 let marker = u32::from_le_bytes([
220 payload[offset], payload[offset + 1],
221 payload[offset + 2], payload[offset + 3],
222 ]);
223 if marker == 0x4649_4449 {
224 offset += 4;
225 let fi_data: &[u8; 68] = payload[offset..offset + 68].try_into().ok()?;
226 Some(FileIdentity::from_bytes(fi_data))
227 } else {
228 None
229 }
230 } else {
231 None
232 };
233
234 Some(ParsedManifest {
235 epoch,
236 dimension,
237 total_vectors,
238 profile_id,
239 segment_dir,
240 deleted_ids,
241 file_identity,
242 })
243}
244
245pub(crate) fn read_vec_seg_payload(payload: &[u8]) -> Option<Vec<(u64, Vec<f32>)>> {
247 if payload.len() < 6 {
248 return None;
249 }
250
251 let dimension = u16::from_le_bytes([payload[0], payload[1]]) as usize;
252 let vector_count = u32::from_le_bytes([payload[2], payload[3], payload[4], payload[5]]) as usize;
253
254 let bytes_per_vec = dimension * 4;
255 let expected_size = 6 + vector_count * (8 + bytes_per_vec);
256 if payload.len() < expected_size {
257 return None;
258 }
259
260 let mut result = Vec::with_capacity(vector_count);
261 let mut offset = 6;
262
263 for _ in 0..vector_count {
264 let vec_id = u64::from_le_bytes([
265 payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
266 payload[offset + 4], payload[offset + 5], payload[offset + 6], payload[offset + 7],
267 ]);
268 offset += 8;
269
270 let mut vec_data = Vec::with_capacity(dimension);
271 for _ in 0..dimension {
272 let val = f32::from_le_bytes([
273 payload[offset], payload[offset + 1], payload[offset + 2], payload[offset + 3],
274 ]);
275 vec_data.push(val);
276 offset += 4;
277 }
278
279 result.push((vec_id, vec_data));
280 }
281
282 Some(result)
283}
284
285const MAX_READ_PAYLOAD: u64 = 256 * 1024 * 1024;
288
289pub(crate) fn read_segment_payload<R: Read + Seek>(
294 reader: &mut R,
295 seg_offset: u64,
296) -> io::Result<(SegmentHeader, Vec<u8>)> {
297 reader.seek(SeekFrom::Start(seg_offset))?;
298
299 let mut hdr_buf = [0u8; SEGMENT_HEADER_SIZE];
300 reader.read_exact(&mut hdr_buf)?;
301
302 let magic = u32::from_le_bytes([hdr_buf[0], hdr_buf[1], hdr_buf[2], hdr_buf[3]]);
303 if magic != SEGMENT_MAGIC {
304 return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid segment magic"));
305 }
306
307 let payload_length = u64::from_le_bytes([
308 hdr_buf[0x10], hdr_buf[0x11], hdr_buf[0x12], hdr_buf[0x13],
309 hdr_buf[0x14], hdr_buf[0x15], hdr_buf[0x16], hdr_buf[0x17],
310 ]);
311
312 if payload_length > MAX_READ_PAYLOAD {
314 return Err(io::Error::new(
315 io::ErrorKind::InvalidData,
316 format!("segment payload too large: {} bytes (max {})", payload_length, MAX_READ_PAYLOAD),
317 ));
318 }
319
320 let header = SegmentHeader {
321 magic,
322 version: hdr_buf[0x04],
323 seg_type: hdr_buf[0x05],
324 flags: u16::from_le_bytes([hdr_buf[0x06], hdr_buf[0x07]]),
325 segment_id: u64::from_le_bytes([
326 hdr_buf[0x08], hdr_buf[0x09], hdr_buf[0x0A], hdr_buf[0x0B],
327 hdr_buf[0x0C], hdr_buf[0x0D], hdr_buf[0x0E], hdr_buf[0x0F],
328 ]),
329 payload_length,
330 timestamp_ns: u64::from_le_bytes([
331 hdr_buf[0x18], hdr_buf[0x19], hdr_buf[0x1A], hdr_buf[0x1B],
332 hdr_buf[0x1C], hdr_buf[0x1D], hdr_buf[0x1E], hdr_buf[0x1F],
333 ]),
334 checksum_algo: hdr_buf[0x20],
335 compression: hdr_buf[0x21],
336 reserved_0: u16::from_le_bytes([hdr_buf[0x22], hdr_buf[0x23]]),
337 reserved_1: u32::from_le_bytes([hdr_buf[0x24], hdr_buf[0x25], hdr_buf[0x26], hdr_buf[0x27]]),
338 content_hash: {
339 let mut h = [0u8; 16];
340 h.copy_from_slice(&hdr_buf[0x28..0x38]);
341 h
342 },
343 uncompressed_len: u32::from_le_bytes([hdr_buf[0x38], hdr_buf[0x39], hdr_buf[0x3A], hdr_buf[0x3B]]),
344 alignment_pad: u32::from_le_bytes([hdr_buf[0x3C], hdr_buf[0x3D], hdr_buf[0x3E], hdr_buf[0x3F]]),
345 };
346
347 let mut payload = vec![0u8; payload_length as usize];
349 reader.read_exact(&mut payload)?;
350
351 if header.content_hash != [0u8; 16] {
353 let computed = compute_content_hash(&payload);
354 if computed != header.content_hash {
355 return Err(io::Error::new(
356 io::ErrorKind::InvalidData,
357 "segment content hash mismatch",
358 ));
359 }
360 }
361
362 Ok((header, payload))
363}
364
365fn compute_content_hash(data: &[u8]) -> [u8; 16] {
368 let mut hash = [0u8; 16];
369 let crc = crc32_for_verify(data);
370 for i in 0..4 {
371 let rotated = crc.rotate_left(i as u32 * 8);
372 hash[i * 4..(i + 1) * 4].copy_from_slice(&rotated.to_le_bytes());
373 }
374 hash
375}
376
377fn crc32_for_verify(data: &[u8]) -> u32 {
379 let mut crc: u32 = 0xFFFF_FFFF;
380 for &byte in data {
381 crc ^= byte as u32;
382 for _ in 0..8 {
383 if crc & 1 != 0 {
384 crc = (crc >> 1) ^ 0xEDB8_8320;
385 } else {
386 crc >>= 1;
387 }
388 }
389 }
390 !crc
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn parse_empty_manifest() {
399 assert!(parse_manifest_payload(&[]).is_none());
400 assert!(parse_manifest_payload(&[0u8; 10]).is_none());
401 }
402
403 #[test]
404 fn vec_seg_round_trip() {
405 let dim: u16 = 2;
407 let count: u32 = 2;
408 let mut payload = Vec::new();
409 payload.extend_from_slice(&dim.to_le_bytes());
410 payload.extend_from_slice(&count.to_le_bytes());
411 payload.extend_from_slice(&10u64.to_le_bytes());
413 payload.extend_from_slice(&1.0f32.to_le_bytes());
414 payload.extend_from_slice(&2.0f32.to_le_bytes());
415 payload.extend_from_slice(&20u64.to_le_bytes());
417 payload.extend_from_slice(&3.0f32.to_le_bytes());
418 payload.extend_from_slice(&4.0f32.to_le_bytes());
419
420 let result = read_vec_seg_payload(&payload).unwrap();
421 assert_eq!(result.len(), 2);
422 assert_eq!(result[0].0, 10);
423 assert_eq!(result[0].1, vec![1.0, 2.0]);
424 assert_eq!(result[1].0, 20);
425 assert_eq!(result[1].1, vec![3.0, 4.0]);
426 }
427}