1use crate::{Error, Result, Record, Key};
2use bytes::{Bytes, BytesMut, BufMut};
3use std::fs::{File, OpenOptions};
4use std::io::{Read, Write};
5use std::path::{Path, PathBuf};
6
7const SST_HEADER_SIZE: usize = 16;
8const SST_MAGIC: u32 = 0x53535400; pub struct SstWriter {
14 records: Vec<Record>,
15}
16
17impl SstWriter {
18 pub fn new() -> Self {
19 Self {
20 records: Vec::new(),
21 }
22 }
23
24 pub fn add(&mut self, record: Record) {
25 self.records.push(record);
26 }
27
28 pub fn finish(mut self, path: impl AsRef<Path>) -> Result<()> {
29 self.records.sort_by(|a, b| {
31 let a_enc = a.key.encode();
32 let b_enc = b.key.encode();
33 a_enc.cmp(&b_enc)
34 });
35
36 let mut file = OpenOptions::new()
37 .write(true)
38 .create_new(true)
39 .open(path)?;
40
41 let mut buf = BytesMut::new();
43 buf.put_u32(SST_MAGIC); buf.put_u32_le(1); buf.put_u32_le(self.records.len() as u32);
46 buf.put_u32_le(0); let mut data = Vec::new();
50 for record in &self.records {
51 let rec_data = bincode::serialize(record)
52 .map_err(|e| Error::Internal(format!("Serialize error: {}", e)))?;
53 data.extend_from_slice(&(rec_data.len() as u32).to_le_bytes());
54 data.extend_from_slice(&rec_data);
55 }
56
57 buf.put_slice(&data);
58
59 let crc = crc32fast::hash(&data);
61 buf.put_u32_le(crc);
62
63 file.write_all(&buf)?;
64 file.sync_all()?;
65
66 Ok(())
67 }
68}
69
70pub struct SstReader {
71 records: Vec<Record>,
72 path: PathBuf,
73}
74
75impl SstReader {
76 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
77 let mut file = File::open(&path)?;
78
79 let mut header = [0u8; SST_HEADER_SIZE];
81 file.read_exact(&mut header)?;
82
83 let magic = u32::from_be_bytes([header[0], header[1], header[2], header[3]]);
84 if magic != SST_MAGIC {
85 return Err(Error::Corruption("Invalid SST magic".to_string()));
86 }
87
88 let count = u32::from_le_bytes([header[8], header[9], header[10], header[11]]) as usize;
89
90 let mut data = Vec::new();
92 file.read_to_end(&mut data)?;
93
94 if data.len() < 4 {
96 return Err(Error::Corruption("SST file too short".to_string()));
97 }
98
99 let crc_offset = data.len() - 4;
100 let expected_crc = u32::from_le_bytes([
101 data[crc_offset],
102 data[crc_offset + 1],
103 data[crc_offset + 2],
104 data[crc_offset + 3],
105 ]);
106
107 let actual_crc = crc32fast::hash(&data[..crc_offset]);
108 if expected_crc != actual_crc {
109 return Err(Error::ChecksumMismatch);
110 }
111
112 let mut records = Vec::with_capacity(count);
114 let mut offset = 0;
115 let data = &data[..crc_offset];
116
117 while offset < data.len() {
118 let len = u32::from_le_bytes([
119 data[offset],
120 data[offset + 1],
121 data[offset + 2],
122 data[offset + 3],
123 ]) as usize;
124 offset += 4;
125
126 let record: Record = bincode::deserialize(&data[offset..offset + len])
127 .map_err(|e| Error::Corruption(format!("Deserialize error: {}", e)))?;
128 offset += len;
129
130 records.push(record);
131 }
132
133 if records.len() != count {
134 return Err(Error::Corruption(format!(
135 "Record count mismatch: expected {}, got {}",
136 count,
137 records.len()
138 )));
139 }
140
141 Ok(Self {
142 records,
143 path: path.as_ref().to_path_buf(),
144 })
145 }
146
147 pub fn get(&self, key: &Key) -> Option<&Record> {
149 let key_enc = key.encode();
150 self.records
151 .binary_search_by(|rec| rec.key.encode().cmp(&key_enc))
152 .ok()
153 .map(|idx| &self.records[idx])
154 }
155
156 pub fn iter(&self) -> impl Iterator<Item = &Record> {
158 self.records.iter()
159 }
160
161 pub fn scan_prefix<'a>(&'a self, pk: &'a Bytes) -> impl Iterator<Item = &'a Record> + 'a {
163 self.records
164 .iter()
165 .filter(move |rec| rec.key.pk == *pk)
166 }
167
168 pub fn path(&self) -> &Path {
170 &self.path
171 }
172
173 pub fn scan(&self) -> Result<impl Iterator<Item = Record> + '_> {
175 Ok(self.records.iter().cloned())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::Value;
183 use std::collections::HashMap;
184 use tempfile::TempDir;
185
186 #[test]
187 fn test_sst_write_and_read() {
188 let tmp = TempDir::new().unwrap();
189 let path = tmp.path().join("test.sst");
190
191 {
193 let mut writer = SstWriter::new();
194 for i in 0..10 {
195 let key = Key::new(format!("key{:03}", i).into_bytes());
196 let mut item = HashMap::new();
197 item.insert("value".to_string(), Value::number(i));
198 writer.add(Record::put(key, item, i));
199 }
200 writer.finish(&path).unwrap();
201 }
202
203 let reader = SstReader::open(&path).unwrap();
205 assert_eq!(reader.records.len(), 10);
206
207 let key = Key::new(b"key005".to_vec());
209 let rec = reader.get(&key).unwrap();
210 assert_eq!(rec.key, key);
211
212 let count = reader.iter().count();
214 assert_eq!(count, 10);
215 }
216
217 #[test]
218 fn test_sst_sorted() {
219 let tmp = TempDir::new().unwrap();
220 let path = tmp.path().join("test.sst");
221
222 {
224 let mut writer = SstWriter::new();
225 for i in [3, 1, 4, 1, 5, 9, 2, 6] {
226 let key = Key::new(format!("key{}", i).into_bytes());
227 let item = HashMap::new();
228 writer.add(Record::put(key, item, i));
229 }
230 writer.finish(&path).unwrap();
231 }
232
233 let reader = SstReader::open(&path).unwrap();
235 let keys: Vec<_> = reader.iter().map(|r| r.key.pk.clone()).collect();
236
237 let mut sorted_keys = keys.clone();
238 sorted_keys.sort();
239 assert_eq!(keys, sorted_keys);
240 }
241
242 #[test]
243 fn test_sst_scan_prefix() {
244 let tmp = TempDir::new().unwrap();
245 let path = tmp.path().join("test.sst");
246
247 {
248 let mut writer = SstWriter::new();
249 writer.add(Record::put(
250 Key::with_sk(b"user#1".to_vec(), b"a".to_vec()),
251 HashMap::new(),
252 1,
253 ));
254 writer.add(Record::put(
255 Key::with_sk(b"user#1".to_vec(), b"b".to_vec()),
256 HashMap::new(),
257 2,
258 ));
259 writer.add(Record::put(
260 Key::with_sk(b"user#2".to_vec(), b"a".to_vec()),
261 HashMap::new(),
262 3,
263 ));
264 writer.finish(&path).unwrap();
265 }
266
267 let reader = SstReader::open(&path).unwrap();
268 let pk = Bytes::from("user#1");
269 let user1_recs: Vec<_> = reader.scan_prefix(&pk).collect();
270 assert_eq!(user1_recs.len(), 2);
271 }
272}