kstone_core/
memory_sst.rs

1/// In-memory Sorted String Table for testing and temporary databases
2///
3/// Provides the same API as the disk-based SST but stores all records in memory.
4/// All data is lost when the MemorySstReader is dropped.
5
6use crate::{Record, Result, Key, bloom::BloomFilter};
7use bytes::Bytes;
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11/// In-memory SST writer
12pub struct MemorySstWriter {
13    records: Vec<Record>,
14}
15
16impl MemorySstWriter {
17    /// Create a new in-memory SST writer
18    pub fn new() -> Self {
19        Self {
20            records: Vec::new(),
21        }
22    }
23
24    /// Add a record to the SST
25    pub fn add(&mut self, record: Record) {
26        self.records.push(record);
27    }
28
29    /// Finish writing and create an in-memory SST reader
30    ///
31    /// Unlike disk-based SST, this doesn't write to a file.
32    /// Instead, it returns a reader with the in-memory data.
33    pub fn finish(mut self, name: impl Into<String>) -> Result<MemorySstReader> {
34        // Sort records by encoded key
35        self.records.sort_by(|a, b| a.key.encode().cmp(&b.key.encode()));
36
37        // Build bloom filter
38        let mut bloom = BloomFilter::new(self.records.len().max(1000), 10);
39        for record in &self.records {
40            bloom.add(&record.key.encode());
41        }
42
43        Ok(MemorySstReader {
44            name: name.into(),
45            records: self.records,
46            bloom,
47        })
48    }
49}
50
51impl Default for MemorySstWriter {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57/// In-memory SST reader
58#[derive(Clone)]
59pub struct MemorySstReader {
60    name: String,
61    records: Vec<Record>,
62    bloom: BloomFilter,
63}
64
65impl MemorySstReader {
66    /// Create a new empty in-memory SST reader (for testing)
67    pub fn empty(name: impl Into<String>) -> Self {
68        Self {
69            name: name.into(),
70            records: Vec::new(),
71            bloom: BloomFilter::new(1000, 10),
72        }
73    }
74
75    /// Get a record by key
76    pub fn get(&self, key: &Key) -> Option<&Record> {
77        let key_bytes = key.encode();
78
79        // Check bloom filter first
80        if !self.bloom.contains(&key_bytes) {
81            return None;
82        }
83
84        // Binary search in sorted records
85        self.records
86            .binary_search_by(|r| r.key.encode().as_ref().cmp(key_bytes.as_ref()))
87            .ok()
88            .map(|idx| &self.records[idx])
89    }
90
91    /// Iterate over all records
92    pub fn iter(&self) -> impl Iterator<Item = &Record> {
93        self.records.iter()
94    }
95
96    /// Scan records with a given partition key prefix
97    pub fn scan_prefix<'a>(&'a self, pk: &'a Bytes) -> impl Iterator<Item = &'a Record> + 'a {
98        self.records.iter().filter(move |r| r.key.pk == *pk)
99    }
100
101    /// Get the name of this SST (virtual "path")
102    pub fn name(&self) -> &str {
103        &self.name
104    }
105
106    /// Scan all records (for compaction)
107    pub fn scan(&self) -> Result<impl Iterator<Item = Record> + '_> {
108        Ok(self.records.iter().cloned())
109    }
110
111    /// Get the number of records
112    pub fn len(&self) -> usize {
113        self.records.len()
114    }
115
116    /// Check if empty
117    pub fn is_empty(&self) -> bool {
118        self.records.is_empty()
119    }
120}
121
122/// Global in-memory SST storage (for managing multiple SSTs)
123///
124/// This acts like a file system for in-memory SSTs, allowing storage and retrieval by name.
125#[derive(Clone)]
126pub struct MemorySstStore {
127    ssts: Arc<Mutex<HashMap<String, MemorySstReader>>>,
128}
129
130impl MemorySstStore {
131    /// Create a new empty SST store
132    pub fn new() -> Self {
133        Self {
134            ssts: Arc::new(Mutex::new(HashMap::new())),
135        }
136    }
137
138    /// Store an SST
139    pub fn store(&self, name: impl Into<String>, sst: MemorySstReader) {
140        let mut ssts = self.ssts.lock().unwrap();
141        ssts.insert(name.into(), sst);
142    }
143
144    /// Retrieve an SST by name
145    pub fn get(&self, name: &str) -> Option<MemorySstReader> {
146        let ssts = self.ssts.lock().unwrap();
147        ssts.get(name).cloned()
148    }
149
150    /// Delete an SST by name
151    pub fn delete(&self, name: &str) -> bool {
152        let mut ssts = self.ssts.lock().unwrap();
153        ssts.remove(name).is_some()
154    }
155
156    /// List all SST names
157    pub fn list_names(&self) -> Vec<String> {
158        let ssts = self.ssts.lock().unwrap();
159        ssts.keys().cloned().collect()
160    }
161
162    /// Get the number of SSTs
163    pub fn len(&self) -> usize {
164        let ssts = self.ssts.lock().unwrap();
165        ssts.len()
166    }
167
168    /// Check if empty
169    pub fn is_empty(&self) -> bool {
170        self.len() == 0
171    }
172
173    /// Clear all SSTs
174    pub fn clear(&self) {
175        let mut ssts = self.ssts.lock().unwrap();
176        ssts.clear();
177    }
178}
179
180impl Default for MemorySstStore {
181    fn default() -> Self {
182        Self::new()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use crate::Value;
190    use std::collections::HashMap;
191
192    fn create_test_record(pk: &[u8], seq: u64) -> Record {
193        let mut item = HashMap::new();
194        item.insert("test".to_string(), Value::string("value"));
195        Record::put(Key::new(pk.to_vec()), item, seq)
196    }
197
198    #[test]
199    fn test_memory_sst_writer_finish() {
200        let mut writer = MemorySstWriter::new();
201        writer.add(create_test_record(b"key1", 1));
202        writer.add(create_test_record(b"key2", 2));
203
204        let reader = writer.finish("test.sst").unwrap();
205        assert_eq!(reader.len(), 2);
206        assert_eq!(reader.name(), "test.sst");
207    }
208
209    #[test]
210    fn test_memory_sst_reader_get() {
211        let mut writer = MemorySstWriter::new();
212        let key1 = Key::new(b"key1".to_vec());
213        writer.add(create_test_record(b"key1", 1));
214
215        let reader = writer.finish("test.sst").unwrap();
216
217        let record = reader.get(&key1);
218        assert!(record.is_some());
219        assert_eq!(record.unwrap().seq, 1);
220
221        let missing = reader.get(&Key::new(b"missing".to_vec()));
222        assert!(missing.is_none());
223    }
224
225    #[test]
226    fn test_memory_sst_reader_iter() {
227        let mut writer = MemorySstWriter::new();
228        writer.add(create_test_record(b"key1", 1));
229        writer.add(create_test_record(b"key2", 2));
230        writer.add(create_test_record(b"key3", 3));
231
232        let reader = writer.finish("test.sst").unwrap();
233
234        let records: Vec<_> = reader.iter().collect();
235        assert_eq!(records.len(), 3);
236    }
237
238    #[test]
239    fn test_memory_sst_reader_scan_prefix() {
240        let mut writer = MemorySstWriter::new();
241        writer.add(create_test_record(b"user#1", 1));
242        writer.add(create_test_record(b"user#2", 2));
243        writer.add(create_test_record(b"post#1", 3));
244
245        let reader = writer.finish("test.sst").unwrap();
246
247        let pk = Bytes::from_static(b"user#1");
248        let records: Vec<_> = reader.scan_prefix(&pk).collect();
249        assert_eq!(records.len(), 1);
250        assert_eq!(records[0].key.pk, pk);
251    }
252
253    #[test]
254    fn test_memory_sst_store() {
255        let store = MemorySstStore::new();
256
257        let mut writer = MemorySstWriter::new();
258        writer.add(create_test_record(b"key1", 1));
259        let sst = writer.finish("test.sst").unwrap();
260
261        store.store("test.sst", sst);
262        assert_eq!(store.len(), 1);
263
264        let retrieved = store.get("test.sst");
265        assert!(retrieved.is_some());
266        assert_eq!(retrieved.unwrap().len(), 1);
267
268        store.delete("test.sst");
269        assert!(store.is_empty());
270    }
271
272    #[test]
273    fn test_memory_sst_store_list_names() {
274        let store = MemorySstStore::new();
275
276        for i in 0..5 {
277            let mut writer = MemorySstWriter::new();
278            writer.add(create_test_record(format!("key{}", i).as_bytes(), i));
279            let sst = writer.finish(format!("sst_{}.sst", i)).unwrap();
280            store.store(format!("sst_{}.sst", i), sst);
281        }
282
283        let names = store.list_names();
284        assert_eq!(names.len(), 5);
285    }
286
287    #[test]
288    fn test_memory_sst_sorted_order() {
289        let mut writer = MemorySstWriter::new();
290        // Add in reverse order
291        writer.add(create_test_record(b"key3", 3));
292        writer.add(create_test_record(b"key1", 1));
293        writer.add(create_test_record(b"key2", 2));
294
295        let reader = writer.finish("test.sst").unwrap();
296
297        let records: Vec<_> = reader.iter().collect();
298        // Should be sorted by key
299        assert_eq!(records[0].key.pk.as_ref(), b"key1");
300        assert_eq!(records[1].key.pk.as_ref(), b"key2");
301        assert_eq!(records[2].key.pk.as_ref(), b"key3");
302    }
303
304    #[test]
305    fn test_memory_sst_bloom_filter() {
306        let mut writer = MemorySstWriter::new();
307        writer.add(create_test_record(b"exists", 1));
308
309        let reader = writer.finish("test.sst").unwrap();
310
311        // Key that exists
312        assert!(reader.get(&Key::new(b"exists".to_vec())).is_some());
313
314        // Key that doesn't exist (bloom filter should help)
315        assert!(reader.get(&Key::new(b"doesnotexist".to_vec())).is_none());
316    }
317}