kstone_core/
sst.rs

1use crate::{Error, Result, Record, Key};
2use bytes::{Bytes, BytesMut, BufMut};
3use std::fs::{File, OpenOptions};
4use std::io::{Read, Write};
5use std::path::{Path, PathBuf};
6
7const SST_HEADER_SIZE: usize = 16;
8const SST_MAGIC: u32 = 0x53535400; // "SST\0"
9
10/// Minimal SST for walking skeleton
11/// Format: [magic(4) | version(4) | count(4) | reserved(4)] [record...] [crc(4)]
12/// Records are sorted by key
13pub struct SstWriter {
14    records: Vec<Record>,
15}
16
17impl SstWriter {
18    pub fn new() -> Self {
19        Self {
20            records: Vec::new(),
21        }
22    }
23
24    pub fn add(&mut self, record: Record) {
25        self.records.push(record);
26    }
27
28    pub fn finish(mut self, path: impl AsRef<Path>) -> Result<()> {
29        // Sort records by key
30        self.records.sort_by(|a, b| {
31            let a_enc = a.key.encode();
32            let b_enc = b.key.encode();
33            a_enc.cmp(&b_enc)
34        });
35
36        let mut file = OpenOptions::new()
37            .write(true)
38            .create_new(true)
39            .open(path)?;
40
41        // Write header (big-endian for magic, little-endian for rest)
42        let mut buf = BytesMut::new();
43        buf.put_u32(SST_MAGIC); // big-endian for magic
44        buf.put_u32_le(1); // version
45        buf.put_u32_le(self.records.len() as u32);
46        buf.put_u32_le(0); // reserved
47
48        // Serialize all records
49        let mut data = Vec::new();
50        for record in &self.records {
51            let rec_data = bincode::serialize(record)
52                .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
53            data.extend_from_slice(&(rec_data.len() as u32).to_le_bytes());
54            data.extend_from_slice(&rec_data);
55        }
56
57        buf.put_slice(&data);
58
59        // Write CRC
60        let crc = crc32fast::hash(&data);
61        buf.put_u32_le(crc);
62
63        file.write_all(&buf)?;
64        file.sync_all()?;
65
66        Ok(())
67    }
68}
69
70pub struct SstReader {
71    records: Vec<Record>,
72    path: PathBuf,
73}
74
75impl SstReader {
76    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
77        let mut file = File::open(&path)?;
78
79        // Read header
80        let mut header = [0u8; SST_HEADER_SIZE];
81        file.read_exact(&mut header)?;
82
83        let magic = u32::from_be_bytes([header[0], header[1], header[2], header[3]]);
84        if magic != SST_MAGIC {
85            return Err(Error::Corruption("Invalid SST magic".to_string()));
86        }
87
88        let count = u32::from_le_bytes([header[8], header[9], header[10], header[11]]) as usize;
89
90        // Read all data
91        let mut data = Vec::new();
92        file.read_to_end(&mut data)?;
93
94        // Verify CRC (last 4 bytes)
95        if data.len() < 4 {
96            return Err(Error::Corruption("SST file too short".to_string()));
97        }
98
99        let crc_offset = data.len() - 4;
100        let expected_crc = u32::from_le_bytes([
101            data[crc_offset],
102            data[crc_offset + 1],
103            data[crc_offset + 2],
104            data[crc_offset + 3],
105        ]);
106
107        let actual_crc = crc32fast::hash(&data[..crc_offset]);
108        if expected_crc != actual_crc {
109            return Err(Error::ChecksumMismatch);
110        }
111
112        // Deserialize records
113        let mut records = Vec::with_capacity(count);
114        let mut offset = 0;
115        let data = &data[..crc_offset];
116
117        while offset < data.len() {
118            let len = u32::from_le_bytes([
119                data[offset],
120                data[offset + 1],
121                data[offset + 2],
122                data[offset + 3],
123            ]) as usize;
124            offset += 4;
125
126            let record: Record = bincode::deserialize(&data[offset..offset + len])
127                .map_err(|e| Error::Corruption(format!("Deserialize error: {}", e)))?;
128            offset += len;
129
130            records.push(record);
131        }
132
133        if records.len() != count {
134            return Err(Error::Corruption(format!(
135                "Record count mismatch: expected {}, got {}",
136                count,
137                records.len()
138            )));
139        }
140
141        Ok(Self {
142            records,
143            path: path.as_ref().to_path_buf(),
144        })
145    }
146
147    /// Get a record by exact key match
148    pub fn get(&self, key: &Key) -> Option<&Record> {
149        let key_enc = key.encode();
150        self.records
151            .binary_search_by(|rec| rec.key.encode().cmp(&key_enc))
152            .ok()
153            .map(|idx| &self.records[idx])
154    }
155
156    /// Iterate all records
157    pub fn iter(&self) -> impl Iterator<Item = &Record> {
158        self.records.iter()
159    }
160
161    /// Scan records with key prefix
162    pub fn scan_prefix<'a>(&'a self, pk: &'a Bytes) -> impl Iterator<Item = &'a Record> + 'a {
163        self.records
164            .iter()
165            .filter(move |rec| rec.key.pk == *pk)
166    }
167
168    /// Get the path to this SST file
169    pub fn path(&self) -> &Path {
170        &self.path
171    }
172
173    /// Scan all records (returns owned records for compaction)
174    pub fn scan(&self) -> Result<impl Iterator<Item = Record> + '_> {
175        Ok(self.records.iter().cloned())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::Value;
183    use std::collections::HashMap;
184    use tempfile::TempDir;
185
186    #[test]
187    fn test_sst_write_and_read() {
188        let tmp = TempDir::new().unwrap();
189        let path = tmp.path().join("test.sst");
190
191        // Write
192        {
193            let mut writer = SstWriter::new();
194            for i in 0..10 {
195                let key = Key::new(format!("key{:03}", i).into_bytes());
196                let mut item = HashMap::new();
197                item.insert("value".to_string(), Value::number(i));
198                writer.add(Record::put(key, item, i));
199            }
200            writer.finish(&path).unwrap();
201        }
202
203        // Read
204        let reader = SstReader::open(&path).unwrap();
205        assert_eq!(reader.records.len(), 10);
206
207        // Get specific key
208        let key = Key::new(b"key005".to_vec());
209        let rec = reader.get(&key).unwrap();
210        assert_eq!(rec.key, key);
211
212        // Iterate
213        let count = reader.iter().count();
214        assert_eq!(count, 10);
215    }
216
217    #[test]
218    fn test_sst_sorted() {
219        let tmp = TempDir::new().unwrap();
220        let path = tmp.path().join("test.sst");
221
222        // Write in random order
223        {
224            let mut writer = SstWriter::new();
225            for i in [3, 1, 4, 1, 5, 9, 2, 6] {
226                let key = Key::new(format!("key{}", i).into_bytes());
227                let item = HashMap::new();
228                writer.add(Record::put(key, item, i));
229            }
230            writer.finish(&path).unwrap();
231        }
232
233        // Read - should be sorted
234        let reader = SstReader::open(&path).unwrap();
235        let keys: Vec<_> = reader.iter().map(|r| r.key.pk.clone()).collect();
236
237        let mut sorted_keys = keys.clone();
238        sorted_keys.sort();
239        assert_eq!(keys, sorted_keys);
240    }
241
242    #[test]
243    fn test_sst_scan_prefix() {
244        let tmp = TempDir::new().unwrap();
245        let path = tmp.path().join("test.sst");
246
247        {
248            let mut writer = SstWriter::new();
249            writer.add(Record::put(
250                Key::with_sk(b"user#1".to_vec(), b"a".to_vec()),
251                HashMap::new(),
252                1,
253            ));
254            writer.add(Record::put(
255                Key::with_sk(b"user#1".to_vec(), b"b".to_vec()),
256                HashMap::new(),
257                2,
258            ));
259            writer.add(Record::put(
260                Key::with_sk(b"user#2".to_vec(), b"a".to_vec()),
261                HashMap::new(),
262                3,
263            ));
264            writer.finish(&path).unwrap();
265        }
266
267        let reader = SstReader::open(&path).unwrap();
268        let pk = Bytes::from("user#1");
269        let user1_recs: Vec<_> = reader.scan_prefix(&pk).collect();
270        assert_eq!(user1_recs.len(), 2);
271    }
272}