Skip to main content

rvf_runtime/
write_path.rs

1//! Append-only write logic for the RVF runtime.
2//!
3//! All mutations append new segments to the file. The write path:
4//! 1. Allocate segment_id (monotonic counter)
5//! 2. Build payload (VEC_SEG, META_SEG, JOURNAL_SEG, etc.)
6//! 3. Write segment header + payload, fsync
7//! 4. Build new MANIFEST_SEG, fsync (two-fsync protocol)
8
9use rvf_types::{SegmentHeader, SegmentType, SEGMENT_HEADER_SIZE};
10use std::io::{self, Seek, Write};
11
12/// Segment writer that handles the append-only write protocol.
13pub(crate) struct SegmentWriter {
14    /// Next segment ID to assign (monotonic counter).
15    next_seg_id: u64,
16}
17
18impl SegmentWriter {
19    pub(crate) fn new(starting_id: u64) -> Self {
20        Self {
21            next_seg_id: starting_id,
22        }
23    }
24
25    /// Allocate a new segment ID.
26    ///
27    /// Uses checked arithmetic to detect overflow (would require 2^64 segments).
28    pub(crate) fn alloc_seg_id(&mut self) -> u64 {
29        let id = self.next_seg_id;
30        self.next_seg_id = self.next_seg_id.checked_add(1)
31            .expect("segment ID counter overflow");
32        id
33    }
34
35    /// Write a VEC_SEG containing the given f32 vectors.
36    ///
37    /// Returns the segment ID and byte offset where it was written.
38    pub(crate) fn write_vec_seg<W: Write + Seek>(
39        &mut self,
40        writer: &mut W,
41        vectors: &[&[f32]],
42        ids: &[u64],
43        dimension: u16,
44    ) -> io::Result<(u64, u64)> {
45        let seg_id = self.alloc_seg_id();
46
47        // Build payload: dimension(u16) + vector_count(u32) + [id(u64) + data(f32 * dim)]
48        let vector_count = vectors.len() as u32;
49        let bytes_per_vec = (dimension as usize) * 4;
50        let payload_size = 2 + 4 + (vectors.len() * (8 + bytes_per_vec));
51        let mut payload = Vec::with_capacity(payload_size);
52
53        payload.extend_from_slice(&dimension.to_le_bytes());
54        payload.extend_from_slice(&vector_count.to_le_bytes());
55        for (vec_data, &vec_id) in vectors.iter().zip(ids.iter()) {
56            payload.extend_from_slice(&vec_id.to_le_bytes());
57            for &val in *vec_data {
58                payload.extend_from_slice(&val.to_le_bytes());
59            }
60        }
61
62        let offset = self.write_segment(writer, SegmentType::Vec as u8, seg_id, &payload)?;
63        Ok((seg_id, offset))
64    }
65
66    /// Write a JOURNAL_SEG with tombstone entries for deleted vector IDs.
67    pub(crate) fn write_journal_seg<W: Write + Seek>(
68        &mut self,
69        writer: &mut W,
70        deleted_ids: &[u64],
71        epoch: u32,
72    ) -> io::Result<(u64, u64)> {
73        let seg_id = self.alloc_seg_id();
74
75        // Journal header (simplified): entry_count(u32) + epoch(u32) + prev_seg_id(u64)
76        // Then entries: each is entry_type(u8) + pad(u8) + len(u16) + vector_id(u64)
77        let entry_count = deleted_ids.len() as u32;
78        let payload_size = 16 + (deleted_ids.len() * 12); // header + entries
79        let mut payload = Vec::with_capacity(payload_size);
80
81        // Journal header.
82        payload.extend_from_slice(&entry_count.to_le_bytes());
83        payload.extend_from_slice(&epoch.to_le_bytes());
84        payload.extend_from_slice(&0u64.to_le_bytes()); // prev_journal_seg_id
85
86        // Entries: DELETE_VECTOR (type 0x01).
87        for &vid in deleted_ids {
88            payload.push(0x01); // DELETE_VECTOR
89            payload.push(0x00); // reserved
90            payload.extend_from_slice(&8u16.to_le_bytes()); // entry_length
91            payload.extend_from_slice(&vid.to_le_bytes());
92        }
93
94        let offset = self.write_segment(writer, SegmentType::Journal as u8, seg_id, &payload)?;
95        Ok((seg_id, offset))
96    }
97
98    /// Write a META_SEG for vector metadata.
99    #[allow(dead_code)]
100    pub(crate) fn write_meta_seg<W: Write + Seek>(
101        &mut self,
102        writer: &mut W,
103        metadata_payload: &[u8],
104    ) -> io::Result<(u64, u64)> {
105        let seg_id = self.alloc_seg_id();
106        let offset = self.write_segment(writer, SegmentType::Meta as u8, seg_id, metadata_payload)?;
107        Ok((seg_id, offset))
108    }
109
110    /// Write a minimal MANIFEST_SEG recording current state.
111    ///
112    /// This is a simplified manifest that stores:
113    /// - epoch, dimension, total_vectors, total_segments, profile_id
114    /// - segment directory entries (seg_id, offset, length, type)
115    /// - deletion bitmap (vector IDs as simple packed u64 array)
116    /// - file identity (68 bytes, appended for lineage provenance)
117    #[allow(clippy::too_many_arguments, dead_code)]
118    pub(crate) fn write_manifest_seg<W: Write + Seek>(
119        &mut self,
120        writer: &mut W,
121        epoch: u32,
122        dimension: u16,
123        total_vectors: u64,
124        profile_id: u8,
125        segment_dir: &[(u64, u64, u64, u8)], // (seg_id, offset, payload_len, seg_type)
126        deleted_ids: &[u64],
127    ) -> io::Result<(u64, u64)> {
128        self.write_manifest_seg_with_identity(
129            writer, epoch, dimension, total_vectors, profile_id,
130            segment_dir, deleted_ids, None,
131        )
132    }
133
134    /// Write a MANIFEST_SEG with optional FileIdentity appended.
135    #[allow(clippy::too_many_arguments)]
136    pub(crate) fn write_manifest_seg_with_identity<W: Write + Seek>(
137        &mut self,
138        writer: &mut W,
139        epoch: u32,
140        dimension: u16,
141        total_vectors: u64,
142        profile_id: u8,
143        segment_dir: &[(u64, u64, u64, u8)],
144        deleted_ids: &[u64],
145        file_identity: Option<&rvf_types::FileIdentity>,
146    ) -> io::Result<(u64, u64)> {
147        let seg_id = self.alloc_seg_id();
148
149        // Build manifest payload.
150        let seg_count = segment_dir.len() as u32;
151        let del_count = deleted_ids.len() as u32;
152        let payload_size = 4 + 2 + 8 + 4 + 1 + 3 // header fields
153            + (segment_dir.len() * (8 + 8 + 8 + 1)) // directory
154            + 4 + (deleted_ids.len() * 8) // deletion bitmap
155            + if file_identity.is_some() { 4 + 68 } else { 0 }; // lineage marker + identity
156
157        let mut payload = Vec::with_capacity(payload_size);
158
159        // Manifest header.
160        payload.extend_from_slice(&epoch.to_le_bytes());
161        payload.extend_from_slice(&dimension.to_le_bytes());
162        payload.extend_from_slice(&total_vectors.to_le_bytes());
163        payload.extend_from_slice(&seg_count.to_le_bytes());
164        payload.push(profile_id);
165        payload.extend_from_slice(&[0u8; 3]); // reserved
166
167        // Segment directory.
168        for &(sid, off, plen, stype) in segment_dir {
169            payload.extend_from_slice(&sid.to_le_bytes());
170            payload.extend_from_slice(&off.to_le_bytes());
171            payload.extend_from_slice(&plen.to_le_bytes());
172            payload.push(stype);
173        }
174
175        // Deletion bitmap (simplified: count + packed IDs).
176        payload.extend_from_slice(&del_count.to_le_bytes());
177        for &did in deleted_ids {
178            payload.extend_from_slice(&did.to_le_bytes());
179        }
180
181        // FileIdentity (optional, backward-compatible trailer).
182        // Magic marker 0x46494449 ("FIDI") followed by 68-byte identity.
183        if let Some(fi) = file_identity {
184            payload.extend_from_slice(&0x4649_4449u32.to_le_bytes()); // "FIDI"
185            payload.extend_from_slice(&fi.to_bytes());
186        }
187
188        let offset = self.write_segment(writer, SegmentType::Manifest as u8, seg_id, &payload)?;
189        Ok((seg_id, offset))
190    }
191
192    /// Maximum kernel image size (128 MiB) to prevent DoS via oversized segments.
193    #[allow(dead_code)]
194    const MAX_KERNEL_IMAGE_SIZE: usize = 128 * 1024 * 1024;
195
196    /// Write a KERNEL_SEG containing a compressed kernel image.
197    ///
198    /// Payload layout: `kernel_header_bytes` (128) + `kernel_image` + optional `cmdline`.
199    /// Returns the segment ID and byte offset where it was written.
200    ///
201    /// Returns an error if the kernel image exceeds 128 MiB.
202    #[allow(dead_code)]
203    pub(crate) fn write_kernel_seg<W: Write + Seek>(
204        &mut self,
205        writer: &mut W,
206        kernel_header_bytes: &[u8; 128],
207        kernel_image: &[u8],
208        cmdline: Option<&[u8]>,
209    ) -> io::Result<(u64, u64)> {
210        if kernel_image.len() > Self::MAX_KERNEL_IMAGE_SIZE {
211            return Err(io::Error::new(
212                io::ErrorKind::InvalidInput,
213                format!(
214                    "kernel image too large: {} bytes (max {})",
215                    kernel_image.len(),
216                    Self::MAX_KERNEL_IMAGE_SIZE
217                ),
218            ));
219        }
220
221        let seg_id = self.alloc_seg_id();
222
223        let cmdline_len = cmdline.map_or(0, |c| c.len());
224        let payload_size = 128 + kernel_image.len() + cmdline_len;
225        let mut payload = Vec::with_capacity(payload_size);
226
227        payload.extend_from_slice(kernel_header_bytes);
228        payload.extend_from_slice(kernel_image);
229        if let Some(cl) = cmdline {
230            payload.extend_from_slice(cl);
231        }
232
233        let offset = self.write_segment(writer, SegmentType::Kernel as u8, seg_id, &payload)?;
234        Ok((seg_id, offset))
235    }
236
237    /// Maximum eBPF program size (16 MiB) to prevent DoS via oversized segments.
238    #[allow(dead_code)]
239    const MAX_EBPF_PROGRAM_SIZE: usize = 16 * 1024 * 1024;
240
241    /// Write an EBPF_SEG containing eBPF program bytecode and optional BTF data.
242    ///
243    /// Payload layout: `ebpf_header_bytes` (64) + `program_bytecode` + optional `btf_data`.
244    /// Returns the segment ID and byte offset where it was written.
245    ///
246    /// Returns an error if the combined bytecode + BTF data exceeds 16 MiB.
247    #[allow(dead_code)]
248    pub(crate) fn write_ebpf_seg<W: Write + Seek>(
249        &mut self,
250        writer: &mut W,
251        ebpf_header_bytes: &[u8; 64],
252        program_bytecode: &[u8],
253        btf_data: Option<&[u8]>,
254    ) -> io::Result<(u64, u64)> {
255        let btf_len = btf_data.map_or(0, |b| b.len());
256        let total_program_size = program_bytecode.len().saturating_add(btf_len);
257        if total_program_size > Self::MAX_EBPF_PROGRAM_SIZE {
258            return Err(io::Error::new(
259                io::ErrorKind::InvalidInput,
260                format!(
261                    "eBPF program too large: {} bytes (max {})",
262                    total_program_size,
263                    Self::MAX_EBPF_PROGRAM_SIZE
264                ),
265            ));
266        }
267
268        let seg_id = self.alloc_seg_id();
269
270        let payload_size = 64 + program_bytecode.len() + btf_len;
271        let mut payload = Vec::with_capacity(payload_size);
272
273        payload.extend_from_slice(ebpf_header_bytes);
274        payload.extend_from_slice(program_bytecode);
275        if let Some(btf) = btf_data {
276            payload.extend_from_slice(btf);
277        }
278
279        let offset = self.write_segment(writer, SegmentType::Ebpf as u8, seg_id, &payload)?;
280        Ok((seg_id, offset))
281    }
282
283    /// Low-level: write a segment header + payload to the writer.
284    /// Returns the byte offset where the segment was written.
285    fn write_segment<W: Write + Seek>(
286        &self,
287        writer: &mut W,
288        seg_type: u8,
289        seg_id: u64,
290        payload: &[u8],
291    ) -> io::Result<u64> {
292        let offset = writer.stream_position()?;
293
294        let mut header = SegmentHeader::new(seg_type, seg_id);
295        header.payload_length = payload.len() as u64;
296
297        // Compute a simple content hash (first 16 bytes of CRC-based hash).
298        let hash = content_hash(payload);
299        header.content_hash = hash;
300
301        // Write header as raw bytes.
302        let header_bytes = header_to_bytes(&header);
303        writer.write_all(&header_bytes)?;
304
305        // Write payload.
306        writer.write_all(payload)?;
307
308        Ok(offset)
309    }
310
311    /// Current next segment ID.
312    #[allow(dead_code)]
313    pub(crate) fn next_id(&self) -> u64 {
314        self.next_seg_id
315    }
316}
317
318/// Convert a SegmentHeader to its 64-byte wire representation.
319fn header_to_bytes(h: &SegmentHeader) -> [u8; SEGMENT_HEADER_SIZE] {
320    let mut buf = [0u8; SEGMENT_HEADER_SIZE];
321    buf[0x00..0x04].copy_from_slice(&h.magic.to_le_bytes());
322    buf[0x04] = h.version;
323    buf[0x05] = h.seg_type;
324    buf[0x06..0x08].copy_from_slice(&h.flags.to_le_bytes());
325    buf[0x08..0x10].copy_from_slice(&h.segment_id.to_le_bytes());
326    buf[0x10..0x18].copy_from_slice(&h.payload_length.to_le_bytes());
327    buf[0x18..0x20].copy_from_slice(&h.timestamp_ns.to_le_bytes());
328    buf[0x20] = h.checksum_algo;
329    buf[0x21] = h.compression;
330    buf[0x22..0x24].copy_from_slice(&h.reserved_0.to_le_bytes());
331    buf[0x24..0x28].copy_from_slice(&h.reserved_1.to_le_bytes());
332    buf[0x28..0x38].copy_from_slice(&h.content_hash);
333    buf[0x38..0x3C].copy_from_slice(&h.uncompressed_len.to_le_bytes());
334    buf[0x3C..0x40].copy_from_slice(&h.alignment_pad.to_le_bytes());
335    buf
336}
337
338/// Compute a simple 16-byte content hash (CRC32-based, rotated for distinct bytes).
339fn content_hash(data: &[u8]) -> [u8; 16] {
340    let mut hash = [0u8; 16];
341    let crc = crc32_slice(data);
342    // Use different rotations of CRC to fill 16 bytes with distinct values.
343    for i in 0..4 {
344        let rotated = crc.rotate_left(i as u32 * 8);
345        hash[i * 4..(i + 1) * 4].copy_from_slice(&rotated.to_le_bytes());
346    }
347    hash
348}
349
350/// Simple CRC32 computation.
351fn crc32_slice(data: &[u8]) -> u32 {
352    let mut crc: u32 = 0xFFFFFFFF;
353    for &byte in data {
354        crc ^= byte as u32;
355        for _ in 0..8 {
356            if crc & 1 != 0 {
357                crc = (crc >> 1) ^ 0xEDB88320;
358            } else {
359                crc >>= 1;
360            }
361        }
362    }
363    !crc
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use rvf_types::SEGMENT_MAGIC;
370    use std::io::Cursor;
371
372    #[test]
373    fn write_vec_seg_round_trip() {
374        let mut buf = Cursor::new(Vec::new());
375        let mut writer = SegmentWriter::new(1);
376
377        let v1: Vec<f32> = vec![1.0, 2.0, 3.0];
378        let v2: Vec<f32> = vec![4.0, 5.0, 6.0];
379        let vectors: Vec<&[f32]> = vec![&v1, &v2];
380        let ids = vec![10u64, 20u64];
381
382        let (seg_id, offset) = writer.write_vec_seg(&mut buf, &vectors, &ids, 3).unwrap();
383        assert_eq!(seg_id, 1);
384        assert_eq!(offset, 0);
385
386        // Verify the data was written.
387        let data = buf.into_inner();
388        assert!(data.len() > SEGMENT_HEADER_SIZE);
389
390        // Check magic.
391        let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
392        assert_eq!(magic, SEGMENT_MAGIC);
393
394        // Check seg_type.
395        assert_eq!(data[5], SegmentType::Vec as u8);
396    }
397
398    #[test]
399    fn seg_id_monotonic() {
400        let mut writer = SegmentWriter::new(10);
401        assert_eq!(writer.alloc_seg_id(), 10);
402        assert_eq!(writer.alloc_seg_id(), 11);
403        assert_eq!(writer.alloc_seg_id(), 12);
404    }
405
406    #[test]
407    fn header_to_bytes_size() {
408        let h = SegmentHeader::new(0x01, 42);
409        let bytes = header_to_bytes(&h);
410        assert_eq!(bytes.len(), SEGMENT_HEADER_SIZE);
411    }
412
413    #[test]
414    fn write_kernel_seg_round_trip() {
415        let mut buf = Cursor::new(Vec::new());
416        let mut writer = SegmentWriter::new(1);
417
418        let kernel_header = [0xAAu8; 128];
419        let kernel_image = b"fake-kernel-image-data";
420
421        let (seg_id, offset) = writer
422            .write_kernel_seg(&mut buf, &kernel_header, kernel_image, Some(b"console=ttyS0"))
423            .unwrap();
424        assert_eq!(seg_id, 1);
425        assert_eq!(offset, 0);
426
427        let data = buf.into_inner();
428        assert!(data.len() > SEGMENT_HEADER_SIZE);
429
430        // Check magic.
431        let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
432        assert_eq!(magic, SEGMENT_MAGIC);
433
434        // Check seg_type == Kernel (0x0E).
435        assert_eq!(data[5], SegmentType::Kernel as u8);
436
437        // Verify payload starts with kernel header bytes.
438        let payload_start = SEGMENT_HEADER_SIZE;
439        assert_eq!(&data[payload_start..payload_start + 128], &[0xAAu8; 128]);
440    }
441
442    #[test]
443    fn write_ebpf_seg_round_trip() {
444        let mut buf = Cursor::new(Vec::new());
445        let mut writer = SegmentWriter::new(10);
446
447        let ebpf_header = [0xBBu8; 64];
448        let bytecode = b"ebpf-bytecode";
449
450        let (seg_id, offset) = writer
451            .write_ebpf_seg(&mut buf, &ebpf_header, bytecode, None)
452            .unwrap();
453        assert_eq!(seg_id, 10);
454        assert_eq!(offset, 0);
455
456        let data = buf.into_inner();
457        assert!(data.len() > SEGMENT_HEADER_SIZE);
458
459        // Check seg_type == Ebpf (0x0F).
460        assert_eq!(data[5], SegmentType::Ebpf as u8);
461
462        // Verify payload starts with eBPF header bytes.
463        let payload_start = SEGMENT_HEADER_SIZE;
464        assert_eq!(&data[payload_start..payload_start + 64], &[0xBBu8; 64]);
465    }
466}