1use rvf_types::{SegmentHeader, SegmentType, SEGMENT_HEADER_SIZE};
10use std::io::{self, Seek, Write};
11
12pub(crate) struct SegmentWriter {
14 next_seg_id: u64,
16}
17
18impl SegmentWriter {
19 pub(crate) fn new(starting_id: u64) -> Self {
20 Self {
21 next_seg_id: starting_id,
22 }
23 }
24
25 pub(crate) fn alloc_seg_id(&mut self) -> u64 {
29 let id = self.next_seg_id;
30 self.next_seg_id = self.next_seg_id.checked_add(1)
31 .expect("segment ID counter overflow");
32 id
33 }
34
35 pub(crate) fn write_vec_seg<W: Write + Seek>(
39 &mut self,
40 writer: &mut W,
41 vectors: &[&[f32]],
42 ids: &[u64],
43 dimension: u16,
44 ) -> io::Result<(u64, u64)> {
45 let seg_id = self.alloc_seg_id();
46
47 let vector_count = vectors.len() as u32;
49 let bytes_per_vec = (dimension as usize) * 4;
50 let payload_size = 2 + 4 + (vectors.len() * (8 + bytes_per_vec));
51 let mut payload = Vec::with_capacity(payload_size);
52
53 payload.extend_from_slice(&dimension.to_le_bytes());
54 payload.extend_from_slice(&vector_count.to_le_bytes());
55 for (vec_data, &vec_id) in vectors.iter().zip(ids.iter()) {
56 payload.extend_from_slice(&vec_id.to_le_bytes());
57 for &val in *vec_data {
58 payload.extend_from_slice(&val.to_le_bytes());
59 }
60 }
61
62 let offset = self.write_segment(writer, SegmentType::Vec as u8, seg_id, &payload)?;
63 Ok((seg_id, offset))
64 }
65
66 pub(crate) fn write_journal_seg<W: Write + Seek>(
68 &mut self,
69 writer: &mut W,
70 deleted_ids: &[u64],
71 epoch: u32,
72 ) -> io::Result<(u64, u64)> {
73 let seg_id = self.alloc_seg_id();
74
75 let entry_count = deleted_ids.len() as u32;
78 let payload_size = 16 + (deleted_ids.len() * 12); let mut payload = Vec::with_capacity(payload_size);
80
81 payload.extend_from_slice(&entry_count.to_le_bytes());
83 payload.extend_from_slice(&epoch.to_le_bytes());
84 payload.extend_from_slice(&0u64.to_le_bytes()); for &vid in deleted_ids {
88 payload.push(0x01); payload.push(0x00); payload.extend_from_slice(&8u16.to_le_bytes()); payload.extend_from_slice(&vid.to_le_bytes());
92 }
93
94 let offset = self.write_segment(writer, SegmentType::Journal as u8, seg_id, &payload)?;
95 Ok((seg_id, offset))
96 }
97
98 #[allow(dead_code)]
100 pub(crate) fn write_meta_seg<W: Write + Seek>(
101 &mut self,
102 writer: &mut W,
103 metadata_payload: &[u8],
104 ) -> io::Result<(u64, u64)> {
105 let seg_id = self.alloc_seg_id();
106 let offset = self.write_segment(writer, SegmentType::Meta as u8, seg_id, metadata_payload)?;
107 Ok((seg_id, offset))
108 }
109
110 #[allow(clippy::too_many_arguments, dead_code)]
118 pub(crate) fn write_manifest_seg<W: Write + Seek>(
119 &mut self,
120 writer: &mut W,
121 epoch: u32,
122 dimension: u16,
123 total_vectors: u64,
124 profile_id: u8,
125 segment_dir: &[(u64, u64, u64, u8)], deleted_ids: &[u64],
127 ) -> io::Result<(u64, u64)> {
128 self.write_manifest_seg_with_identity(
129 writer, epoch, dimension, total_vectors, profile_id,
130 segment_dir, deleted_ids, None,
131 )
132 }
133
134 #[allow(clippy::too_many_arguments)]
136 pub(crate) fn write_manifest_seg_with_identity<W: Write + Seek>(
137 &mut self,
138 writer: &mut W,
139 epoch: u32,
140 dimension: u16,
141 total_vectors: u64,
142 profile_id: u8,
143 segment_dir: &[(u64, u64, u64, u8)],
144 deleted_ids: &[u64],
145 file_identity: Option<&rvf_types::FileIdentity>,
146 ) -> io::Result<(u64, u64)> {
147 let seg_id = self.alloc_seg_id();
148
149 let seg_count = segment_dir.len() as u32;
151 let del_count = deleted_ids.len() as u32;
152 let payload_size = 4 + 2 + 8 + 4 + 1 + 3 + (segment_dir.len() * (8 + 8 + 8 + 1)) + 4 + (deleted_ids.len() * 8) + if file_identity.is_some() { 4 + 68 } else { 0 }; let mut payload = Vec::with_capacity(payload_size);
158
159 payload.extend_from_slice(&epoch.to_le_bytes());
161 payload.extend_from_slice(&dimension.to_le_bytes());
162 payload.extend_from_slice(&total_vectors.to_le_bytes());
163 payload.extend_from_slice(&seg_count.to_le_bytes());
164 payload.push(profile_id);
165 payload.extend_from_slice(&[0u8; 3]); for &(sid, off, plen, stype) in segment_dir {
169 payload.extend_from_slice(&sid.to_le_bytes());
170 payload.extend_from_slice(&off.to_le_bytes());
171 payload.extend_from_slice(&plen.to_le_bytes());
172 payload.push(stype);
173 }
174
175 payload.extend_from_slice(&del_count.to_le_bytes());
177 for &did in deleted_ids {
178 payload.extend_from_slice(&did.to_le_bytes());
179 }
180
181 if let Some(fi) = file_identity {
184 payload.extend_from_slice(&0x4649_4449u32.to_le_bytes()); payload.extend_from_slice(&fi.to_bytes());
186 }
187
188 let offset = self.write_segment(writer, SegmentType::Manifest as u8, seg_id, &payload)?;
189 Ok((seg_id, offset))
190 }
191
192 #[allow(dead_code)]
194 const MAX_KERNEL_IMAGE_SIZE: usize = 128 * 1024 * 1024;
195
196 #[allow(dead_code)]
203 pub(crate) fn write_kernel_seg<W: Write + Seek>(
204 &mut self,
205 writer: &mut W,
206 kernel_header_bytes: &[u8; 128],
207 kernel_image: &[u8],
208 cmdline: Option<&[u8]>,
209 ) -> io::Result<(u64, u64)> {
210 if kernel_image.len() > Self::MAX_KERNEL_IMAGE_SIZE {
211 return Err(io::Error::new(
212 io::ErrorKind::InvalidInput,
213 format!(
214 "kernel image too large: {} bytes (max {})",
215 kernel_image.len(),
216 Self::MAX_KERNEL_IMAGE_SIZE
217 ),
218 ));
219 }
220
221 let seg_id = self.alloc_seg_id();
222
223 let cmdline_len = cmdline.map_or(0, |c| c.len());
224 let payload_size = 128 + kernel_image.len() + cmdline_len;
225 let mut payload = Vec::with_capacity(payload_size);
226
227 payload.extend_from_slice(kernel_header_bytes);
228 payload.extend_from_slice(kernel_image);
229 if let Some(cl) = cmdline {
230 payload.extend_from_slice(cl);
231 }
232
233 let offset = self.write_segment(writer, SegmentType::Kernel as u8, seg_id, &payload)?;
234 Ok((seg_id, offset))
235 }
236
237 #[allow(dead_code)]
239 const MAX_EBPF_PROGRAM_SIZE: usize = 16 * 1024 * 1024;
240
241 #[allow(dead_code)]
248 pub(crate) fn write_ebpf_seg<W: Write + Seek>(
249 &mut self,
250 writer: &mut W,
251 ebpf_header_bytes: &[u8; 64],
252 program_bytecode: &[u8],
253 btf_data: Option<&[u8]>,
254 ) -> io::Result<(u64, u64)> {
255 let btf_len = btf_data.map_or(0, |b| b.len());
256 let total_program_size = program_bytecode.len().saturating_add(btf_len);
257 if total_program_size > Self::MAX_EBPF_PROGRAM_SIZE {
258 return Err(io::Error::new(
259 io::ErrorKind::InvalidInput,
260 format!(
261 "eBPF program too large: {} bytes (max {})",
262 total_program_size,
263 Self::MAX_EBPF_PROGRAM_SIZE
264 ),
265 ));
266 }
267
268 let seg_id = self.alloc_seg_id();
269
270 let payload_size = 64 + program_bytecode.len() + btf_len;
271 let mut payload = Vec::with_capacity(payload_size);
272
273 payload.extend_from_slice(ebpf_header_bytes);
274 payload.extend_from_slice(program_bytecode);
275 if let Some(btf) = btf_data {
276 payload.extend_from_slice(btf);
277 }
278
279 let offset = self.write_segment(writer, SegmentType::Ebpf as u8, seg_id, &payload)?;
280 Ok((seg_id, offset))
281 }
282
283 fn write_segment<W: Write + Seek>(
286 &self,
287 writer: &mut W,
288 seg_type: u8,
289 seg_id: u64,
290 payload: &[u8],
291 ) -> io::Result<u64> {
292 let offset = writer.stream_position()?;
293
294 let mut header = SegmentHeader::new(seg_type, seg_id);
295 header.payload_length = payload.len() as u64;
296
297 let hash = content_hash(payload);
299 header.content_hash = hash;
300
301 let header_bytes = header_to_bytes(&header);
303 writer.write_all(&header_bytes)?;
304
305 writer.write_all(payload)?;
307
308 Ok(offset)
309 }
310
311 #[allow(dead_code)]
313 pub(crate) fn next_id(&self) -> u64 {
314 self.next_seg_id
315 }
316}
317
318fn header_to_bytes(h: &SegmentHeader) -> [u8; SEGMENT_HEADER_SIZE] {
320 let mut buf = [0u8; SEGMENT_HEADER_SIZE];
321 buf[0x00..0x04].copy_from_slice(&h.magic.to_le_bytes());
322 buf[0x04] = h.version;
323 buf[0x05] = h.seg_type;
324 buf[0x06..0x08].copy_from_slice(&h.flags.to_le_bytes());
325 buf[0x08..0x10].copy_from_slice(&h.segment_id.to_le_bytes());
326 buf[0x10..0x18].copy_from_slice(&h.payload_length.to_le_bytes());
327 buf[0x18..0x20].copy_from_slice(&h.timestamp_ns.to_le_bytes());
328 buf[0x20] = h.checksum_algo;
329 buf[0x21] = h.compression;
330 buf[0x22..0x24].copy_from_slice(&h.reserved_0.to_le_bytes());
331 buf[0x24..0x28].copy_from_slice(&h.reserved_1.to_le_bytes());
332 buf[0x28..0x38].copy_from_slice(&h.content_hash);
333 buf[0x38..0x3C].copy_from_slice(&h.uncompressed_len.to_le_bytes());
334 buf[0x3C..0x40].copy_from_slice(&h.alignment_pad.to_le_bytes());
335 buf
336}
337
338fn content_hash(data: &[u8]) -> [u8; 16] {
340 let mut hash = [0u8; 16];
341 let crc = crc32_slice(data);
342 for i in 0..4 {
344 let rotated = crc.rotate_left(i as u32 * 8);
345 hash[i * 4..(i + 1) * 4].copy_from_slice(&rotated.to_le_bytes());
346 }
347 hash
348}
349
350fn crc32_slice(data: &[u8]) -> u32 {
352 let mut crc: u32 = 0xFFFFFFFF;
353 for &byte in data {
354 crc ^= byte as u32;
355 for _ in 0..8 {
356 if crc & 1 != 0 {
357 crc = (crc >> 1) ^ 0xEDB88320;
358 } else {
359 crc >>= 1;
360 }
361 }
362 }
363 !crc
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use rvf_types::SEGMENT_MAGIC;
370 use std::io::Cursor;
371
372 #[test]
373 fn write_vec_seg_round_trip() {
374 let mut buf = Cursor::new(Vec::new());
375 let mut writer = SegmentWriter::new(1);
376
377 let v1: Vec<f32> = vec![1.0, 2.0, 3.0];
378 let v2: Vec<f32> = vec![4.0, 5.0, 6.0];
379 let vectors: Vec<&[f32]> = vec![&v1, &v2];
380 let ids = vec![10u64, 20u64];
381
382 let (seg_id, offset) = writer.write_vec_seg(&mut buf, &vectors, &ids, 3).unwrap();
383 assert_eq!(seg_id, 1);
384 assert_eq!(offset, 0);
385
386 let data = buf.into_inner();
388 assert!(data.len() > SEGMENT_HEADER_SIZE);
389
390 let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
392 assert_eq!(magic, SEGMENT_MAGIC);
393
394 assert_eq!(data[5], SegmentType::Vec as u8);
396 }
397
398 #[test]
399 fn seg_id_monotonic() {
400 let mut writer = SegmentWriter::new(10);
401 assert_eq!(writer.alloc_seg_id(), 10);
402 assert_eq!(writer.alloc_seg_id(), 11);
403 assert_eq!(writer.alloc_seg_id(), 12);
404 }
405
406 #[test]
407 fn header_to_bytes_size() {
408 let h = SegmentHeader::new(0x01, 42);
409 let bytes = header_to_bytes(&h);
410 assert_eq!(bytes.len(), SEGMENT_HEADER_SIZE);
411 }
412
413 #[test]
414 fn write_kernel_seg_round_trip() {
415 let mut buf = Cursor::new(Vec::new());
416 let mut writer = SegmentWriter::new(1);
417
418 let kernel_header = [0xAAu8; 128];
419 let kernel_image = b"fake-kernel-image-data";
420
421 let (seg_id, offset) = writer
422 .write_kernel_seg(&mut buf, &kernel_header, kernel_image, Some(b"console=ttyS0"))
423 .unwrap();
424 assert_eq!(seg_id, 1);
425 assert_eq!(offset, 0);
426
427 let data = buf.into_inner();
428 assert!(data.len() > SEGMENT_HEADER_SIZE);
429
430 let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
432 assert_eq!(magic, SEGMENT_MAGIC);
433
434 assert_eq!(data[5], SegmentType::Kernel as u8);
436
437 let payload_start = SEGMENT_HEADER_SIZE;
439 assert_eq!(&data[payload_start..payload_start + 128], &[0xAAu8; 128]);
440 }
441
442 #[test]
443 fn write_ebpf_seg_round_trip() {
444 let mut buf = Cursor::new(Vec::new());
445 let mut writer = SegmentWriter::new(10);
446
447 let ebpf_header = [0xBBu8; 64];
448 let bytecode = b"ebpf-bytecode";
449
450 let (seg_id, offset) = writer
451 .write_ebpf_seg(&mut buf, &ebpf_header, bytecode, None)
452 .unwrap();
453 assert_eq!(seg_id, 10);
454 assert_eq!(offset, 0);
455
456 let data = buf.into_inner();
457 assert!(data.len() > SEGMENT_HEADER_SIZE);
458
459 assert_eq!(data[5], SegmentType::Ebpf as u8);
461
462 let payload_start = SEGMENT_HEADER_SIZE;
464 assert_eq!(&data[payload_start..payload_start + 64], &[0xBBu8; 64]);
465 }
466}