siftdb_core/
storage.rs

1use crate::types::{Frame, FileHeader, HandleMetadata};
2use anyhow::Result;
3use std::fs::{File, OpenOptions};
4use std::io::{Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7/// Segment writer for append-only storage
8pub struct SegmentWriter {
9    file: File,
10    #[allow(dead_code)]
11    path: PathBuf,
12    current_offset: u64,
13    seg_id: u32,
14}
15
16impl SegmentWriter {
17    pub fn new(store_path: &Path, seg_id: u32) -> Result<Self> {
18        let path = store_path.join(format!("seg-{:06}.sift", seg_id));
19        let file = OpenOptions::new()
20            .create(true)
21            .append(true)
22            .open(&path)?;
23        
24        // Get current file size to track offset
25        let current_offset = file.metadata()?.len();
26        
27        Ok(Self {
28            file,
29            path,
30            current_offset,
31            seg_id,
32        })
33    }
34    
35    pub fn write_frame(&mut self, frame: &Frame) -> Result<HandleMetadata> {
36        let start_offset = self.current_offset;
37        
38        // Calculate frame length (excluding the frame_len field itself)
39        let frame_content_len = frame.frame_size() - 4;
40        
41        // Write frame_len
42        self.file.write_all(&(frame_content_len as u32).to_le_bytes())?;
43        
44        // Write header_crc32
45        let header_bytes = frame.header.to_bytes();
46        let header_crc = crc32fast::hash(&header_bytes);
47        self.file.write_all(&header_crc.to_le_bytes())?;
48        
49        // Write FileHeader (64 bytes)
50        self.file.write_all(&header_bytes)?;
51        
52        // Write content_crc32
53        let content_crc = crc32fast::hash(&frame.content);
54        self.file.write_all(&content_crc.to_le_bytes())?;
55        
56        // Write content
57        self.file.write_all(&frame.content)?;
58        
59        // Write line table
60        let line_tbl_offset = (self.current_offset + 4 + 4 + 64 + 4 + frame.content.len() as u64) as u32;
61        self.file.write_all(&frame.line_table)?;
62        
63        // Write frame_crc32 (CRC of everything except frame_len and this CRC itself)
64        let mut frame_data = Vec::new();
65        frame_data.extend_from_slice(&header_crc.to_le_bytes());
66        frame_data.extend_from_slice(&header_bytes);
67        frame_data.extend_from_slice(&content_crc.to_le_bytes());
68        frame_data.extend_from_slice(&frame.content);
69        frame_data.extend_from_slice(&frame.line_table);
70        let frame_crc = crc32fast::hash(&frame_data);
71        self.file.write_all(&frame_crc.to_le_bytes())?;
72        
73        // Pad to 4KiB alignment
74        let total_written = frame.frame_size();
75        let padded_size = frame.padded_frame_size();
76        let padding_needed = padded_size - total_written;
77        if padding_needed > 0 {
78            let padding = vec![0u8; padding_needed];
79            self.file.write_all(&padding)?;
80        }
81        
82        self.file.flush()?;
83        
84        let metadata = HandleMetadata {
85            seg_id: self.seg_id,
86            offset: start_offset,
87            frame_len: padded_size as u32,
88            line_tbl_offset,
89        };
90        
91        self.current_offset += padded_size as u64;
92        
93        Ok(metadata)
94    }
95    
96    pub fn current_offset(&self) -> u64 {
97        self.current_offset
98    }
99    
100    pub fn seg_id(&self) -> u32 {
101        self.seg_id
102    }
103}
104
105/// Segment reader for reading frames
106pub struct SegmentReader {
107    file: File,
108    #[allow(dead_code)]
109    seg_id: u32,
110}
111
112impl SegmentReader {
113    pub fn new(store_path: &Path, seg_id: u32) -> Result<Self> {
114        let path = store_path.join(format!("seg-{:06}.sift", seg_id));
115        let file = File::open(path)?;
116        
117        Ok(Self { file, seg_id })
118    }
119    
120    pub fn read_frame(&mut self, metadata: &HandleMetadata) -> Result<Frame> {
121        // Seek to frame start
122        self.file.seek(SeekFrom::Start(metadata.offset))?;
123        
124        // Read frame_len
125        let mut buf = [0u8; 4];
126        self.file.read_exact(&mut buf)?;
127        let _frame_len = u32::from_le_bytes(buf);
128        
129        // Read header_crc32
130        self.file.read_exact(&mut buf)?;
131        let expected_header_crc = u32::from_le_bytes(buf);
132        
133        // Read FileHeader (64 bytes)
134        let mut header_buf = [0u8; 64];
135        self.file.read_exact(&mut header_buf)?;
136        let actual_header_crc = crc32fast::hash(&header_buf);
137        
138        if actual_header_crc != expected_header_crc {
139            anyhow::bail!("Header CRC mismatch");
140        }
141        
142        let header = FileHeader::from_bytes(&header_buf)?;
143        
144        // Read content_crc32
145        self.file.read_exact(&mut buf)?;
146        let expected_content_crc = u32::from_le_bytes(buf);
147        
148        // Read content
149        let mut content = vec![0u8; header.content_len as usize];
150        self.file.read_exact(&mut content)?;
151        let actual_content_crc = crc32fast::hash(&content);
152        
153        if actual_content_crc != expected_content_crc {
154            anyhow::bail!("Content CRC mismatch");
155        }
156        
157        // Read line table
158        let mut line_table = vec![0u8; header.line_tbl_len as usize];
159        self.file.read_exact(&mut line_table)?;
160        
161        // Verify frame CRC
162        self.file.read_exact(&mut buf)?;
163        let expected_frame_crc = u32::from_le_bytes(buf);
164        
165        let mut frame_data = Vec::new();
166        frame_data.extend_from_slice(&expected_header_crc.to_le_bytes());
167        frame_data.extend_from_slice(&header_buf);
168        frame_data.extend_from_slice(&expected_content_crc.to_le_bytes());
169        frame_data.extend_from_slice(&content);
170        frame_data.extend_from_slice(&line_table);
171        let actual_frame_crc = crc32fast::hash(&frame_data);
172        
173        if actual_frame_crc != expected_frame_crc {
174            anyhow::bail!("Frame CRC mismatch");
175        }
176        
177        Ok(Frame {
178            header,
179            content,
180            line_table,
181        })
182    }
183}
184
185/// Generate line table from content (delta-encoded varint positions of newlines)
186pub fn generate_line_table(content: &[u8]) -> Vec<u8> {
187    let mut positions = Vec::new();
188    
189    // Find all newline positions
190    for (i, &byte) in content.iter().enumerate() {
191        if byte == b'\n' {
192            positions.push(i as u32);
193        }
194    }
195    
196    // Delta-encode and write as varints
197    let mut line_table = Vec::new();
198    let mut last_pos = 0u32;
199    
200    for pos in positions {
201        let delta = pos - last_pos;
202        write_varint(&mut line_table, delta as u64);
203        last_pos = pos;
204    }
205    
206    line_table
207}
208
209/// Decode line table to get newline positions
210pub fn decode_line_table(line_table: &[u8]) -> Result<Vec<u32>> {
211    let mut positions = Vec::new();
212    let mut cursor = 0;
213    let mut current_pos = 0u32;
214    
215    while cursor < line_table.len() {
216        let (delta, bytes_read) = read_varint(&line_table[cursor..])?;
217        current_pos += delta as u32;
218        positions.push(current_pos);
219        cursor += bytes_read;
220    }
221    
222    Ok(positions)
223}
224
225/// Convert byte offset to line number using line table
226pub fn byte_to_line(byte_offset: usize, newline_positions: &[u32]) -> u32 {
227    // Binary search to find the line number
228    match newline_positions.binary_search(&(byte_offset as u32)) {
229        Ok(index) => (index + 1) as u32, // Found exact newline position
230        Err(index) => (index + 1) as u32, // Insert position gives us the line number
231    }
232}
233
234// Varint encoding/decoding utilities
235fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
236    while value >= 0x80 {
237        buf.push((value as u8) | 0x80);
238        value >>= 7;
239    }
240    buf.push(value as u8);
241}
242
243fn read_varint(data: &[u8]) -> Result<(u64, usize)> {
244    let mut result = 0u64;
245    let mut shift = 0;
246    let mut bytes_read = 0;
247    
248    for &byte in data {
249        bytes_read += 1;
250        result |= ((byte & 0x7F) as u64) << shift;
251        
252        if (byte & 0x80) == 0 {
253            return Ok((result, bytes_read));
254        }
255        
256        shift += 7;
257        if shift >= 64 {
258            anyhow::bail!("Varint too large");
259        }
260    }
261    
262    anyhow::bail!("Incomplete varint");
263}