kstone_core/
memory_sst.rs1use crate::{Record, Result, Key, bloom::BloomFilter};
7use bytes::Bytes;
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11pub struct MemorySstWriter {
13 records: Vec<Record>,
14}
15
16impl MemorySstWriter {
17 pub fn new() -> Self {
19 Self {
20 records: Vec::new(),
21 }
22 }
23
24 pub fn add(&mut self, record: Record) {
26 self.records.push(record);
27 }
28
29 pub fn finish(mut self, name: impl Into<String>) -> Result<MemorySstReader> {
34 self.records.sort_by(|a, b| a.key.encode().cmp(&b.key.encode()));
36
37 let mut bloom = BloomFilter::new(self.records.len().max(1000), 10);
39 for record in &self.records {
40 bloom.add(&record.key.encode());
41 }
42
43 Ok(MemorySstReader {
44 name: name.into(),
45 records: self.records,
46 bloom,
47 })
48 }
49}
50
51impl Default for MemorySstWriter {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57#[derive(Clone)]
59pub struct MemorySstReader {
60 name: String,
61 records: Vec<Record>,
62 bloom: BloomFilter,
63}
64
65impl MemorySstReader {
66 pub fn empty(name: impl Into<String>) -> Self {
68 Self {
69 name: name.into(),
70 records: Vec::new(),
71 bloom: BloomFilter::new(1000, 10),
72 }
73 }
74
75 pub fn get(&self, key: &Key) -> Option<&Record> {
77 let key_bytes = key.encode();
78
79 if !self.bloom.contains(&key_bytes) {
81 return None;
82 }
83
84 self.records
86 .binary_search_by(|r| r.key.encode().as_ref().cmp(key_bytes.as_ref()))
87 .ok()
88 .map(|idx| &self.records[idx])
89 }
90
91 pub fn iter(&self) -> impl Iterator<Item = &Record> {
93 self.records.iter()
94 }
95
96 pub fn scan_prefix<'a>(&'a self, pk: &'a Bytes) -> impl Iterator<Item = &'a Record> + 'a {
98 self.records.iter().filter(move |r| r.key.pk == *pk)
99 }
100
101 pub fn name(&self) -> &str {
103 &self.name
104 }
105
106 pub fn scan(&self) -> Result<impl Iterator<Item = Record> + '_> {
108 Ok(self.records.iter().cloned())
109 }
110
111 pub fn len(&self) -> usize {
113 self.records.len()
114 }
115
116 pub fn is_empty(&self) -> bool {
118 self.records.is_empty()
119 }
120}
121
122#[derive(Clone)]
126pub struct MemorySstStore {
127 ssts: Arc<Mutex<HashMap<String, MemorySstReader>>>,
128}
129
130impl MemorySstStore {
131 pub fn new() -> Self {
133 Self {
134 ssts: Arc::new(Mutex::new(HashMap::new())),
135 }
136 }
137
138 pub fn store(&self, name: impl Into<String>, sst: MemorySstReader) {
140 let mut ssts = self.ssts.lock().unwrap();
141 ssts.insert(name.into(), sst);
142 }
143
144 pub fn get(&self, name: &str) -> Option<MemorySstReader> {
146 let ssts = self.ssts.lock().unwrap();
147 ssts.get(name).cloned()
148 }
149
150 pub fn delete(&self, name: &str) -> bool {
152 let mut ssts = self.ssts.lock().unwrap();
153 ssts.remove(name).is_some()
154 }
155
156 pub fn list_names(&self) -> Vec<String> {
158 let ssts = self.ssts.lock().unwrap();
159 ssts.keys().cloned().collect()
160 }
161
162 pub fn len(&self) -> usize {
164 let ssts = self.ssts.lock().unwrap();
165 ssts.len()
166 }
167
168 pub fn is_empty(&self) -> bool {
170 self.len() == 0
171 }
172
173 pub fn clear(&self) {
175 let mut ssts = self.ssts.lock().unwrap();
176 ssts.clear();
177 }
178}
179
180impl Default for MemorySstStore {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use crate::Value;
190 use std::collections::HashMap;
191
192 fn create_test_record(pk: &[u8], seq: u64) -> Record {
193 let mut item = HashMap::new();
194 item.insert("test".to_string(), Value::string("value"));
195 Record::put(Key::new(pk.to_vec()), item, seq)
196 }
197
198 #[test]
199 fn test_memory_sst_writer_finish() {
200 let mut writer = MemorySstWriter::new();
201 writer.add(create_test_record(b"key1", 1));
202 writer.add(create_test_record(b"key2", 2));
203
204 let reader = writer.finish("test.sst").unwrap();
205 assert_eq!(reader.len(), 2);
206 assert_eq!(reader.name(), "test.sst");
207 }
208
209 #[test]
210 fn test_memory_sst_reader_get() {
211 let mut writer = MemorySstWriter::new();
212 let key1 = Key::new(b"key1".to_vec());
213 writer.add(create_test_record(b"key1", 1));
214
215 let reader = writer.finish("test.sst").unwrap();
216
217 let record = reader.get(&key1);
218 assert!(record.is_some());
219 assert_eq!(record.unwrap().seq, 1);
220
221 let missing = reader.get(&Key::new(b"missing".to_vec()));
222 assert!(missing.is_none());
223 }
224
225 #[test]
226 fn test_memory_sst_reader_iter() {
227 let mut writer = MemorySstWriter::new();
228 writer.add(create_test_record(b"key1", 1));
229 writer.add(create_test_record(b"key2", 2));
230 writer.add(create_test_record(b"key3", 3));
231
232 let reader = writer.finish("test.sst").unwrap();
233
234 let records: Vec<_> = reader.iter().collect();
235 assert_eq!(records.len(), 3);
236 }
237
238 #[test]
239 fn test_memory_sst_reader_scan_prefix() {
240 let mut writer = MemorySstWriter::new();
241 writer.add(create_test_record(b"user#1", 1));
242 writer.add(create_test_record(b"user#2", 2));
243 writer.add(create_test_record(b"post#1", 3));
244
245 let reader = writer.finish("test.sst").unwrap();
246
247 let pk = Bytes::from_static(b"user#1");
248 let records: Vec<_> = reader.scan_prefix(&pk).collect();
249 assert_eq!(records.len(), 1);
250 assert_eq!(records[0].key.pk, pk);
251 }
252
253 #[test]
254 fn test_memory_sst_store() {
255 let store = MemorySstStore::new();
256
257 let mut writer = MemorySstWriter::new();
258 writer.add(create_test_record(b"key1", 1));
259 let sst = writer.finish("test.sst").unwrap();
260
261 store.store("test.sst", sst);
262 assert_eq!(store.len(), 1);
263
264 let retrieved = store.get("test.sst");
265 assert!(retrieved.is_some());
266 assert_eq!(retrieved.unwrap().len(), 1);
267
268 store.delete("test.sst");
269 assert!(store.is_empty());
270 }
271
272 #[test]
273 fn test_memory_sst_store_list_names() {
274 let store = MemorySstStore::new();
275
276 for i in 0..5 {
277 let mut writer = MemorySstWriter::new();
278 writer.add(create_test_record(format!("key{}", i).as_bytes(), i));
279 let sst = writer.finish(format!("sst_{}.sst", i)).unwrap();
280 store.store(format!("sst_{}.sst", i), sst);
281 }
282
283 let names = store.list_names();
284 assert_eq!(names.len(), 5);
285 }
286
287 #[test]
288 fn test_memory_sst_sorted_order() {
289 let mut writer = MemorySstWriter::new();
290 writer.add(create_test_record(b"key3", 3));
292 writer.add(create_test_record(b"key1", 1));
293 writer.add(create_test_record(b"key2", 2));
294
295 let reader = writer.finish("test.sst").unwrap();
296
297 let records: Vec<_> = reader.iter().collect();
298 assert_eq!(records[0].key.pk.as_ref(), b"key1");
300 assert_eq!(records[1].key.pk.as_ref(), b"key2");
301 assert_eq!(records[2].key.pk.as_ref(), b"key3");
302 }
303
304 #[test]
305 fn test_memory_sst_bloom_filter() {
306 let mut writer = MemorySstWriter::new();
307 writer.add(create_test_record(b"exists", 1));
308
309 let reader = writer.finish("test.sst").unwrap();
310
311 assert!(reader.get(&Key::new(b"exists".to_vec())).is_some());
313
314 assert!(reader.get(&Key::new(b"doesnotexist".to_vec())).is_none());
316 }
317}