sochdb_storage/
deferred_index.rs1use parking_lot::RwLock;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54
55#[derive(Clone, Debug)]
57pub struct DeferredIndexConfig {
58 pub max_unsorted_entries: usize,
61 pub enabled: bool,
63}
64
65impl Default for DeferredIndexConfig {
66 fn default() -> Self {
67 Self {
68 max_unsorted_entries: 10_000,
69 enabled: true,
70 }
71 }
72}
73
74pub struct DeferredSortedIndex {
90 config: DeferredIndexConfig,
92 sorted_vec: RwLock<Vec<Vec<u8>>>,
95 hot_buffer: RwLock<Vec<Vec<u8>>>,
97 needs_compaction: AtomicBool,
99 total_inserts: AtomicU64,
101 total_compactions: AtomicU64,
103}
104
105impl DeferredSortedIndex {
106 pub fn new() -> Self {
108 Self::with_config(DeferredIndexConfig::default())
109 }
110
111 pub fn with_config(config: DeferredIndexConfig) -> Self {
113 Self {
114 config,
115 sorted_vec: RwLock::new(Vec::new()),
116 hot_buffer: RwLock::new(Vec::with_capacity(1000)),
117 needs_compaction: AtomicBool::new(false),
118 total_inserts: AtomicU64::new(0),
119 total_compactions: AtomicU64::new(0),
120 }
121 }
122
123 #[inline]
127 pub fn insert(&self, key: Vec<u8>) {
128 self.total_inserts.fetch_add(1, Ordering::Relaxed);
129
130 {
132 let mut buffer = self.hot_buffer.write();
133 buffer.push(key);
134
135 if buffer.len() >= self.config.max_unsorted_entries {
137 self.needs_compaction.store(true, Ordering::Release);
138 }
139 }
140 }
141
142 #[inline]
144 pub fn insert_ref(&self, key: &[u8]) {
145 self.insert(key.to_vec());
146 }
147
148 pub fn compact(&self) {
153 let entries_to_merge = {
154 let mut buffer = self.hot_buffer.write();
155 if buffer.is_empty() {
156 return;
157 }
158 std::mem::take(&mut *buffer)
159 };
160
161 let mut new_entries = entries_to_merge;
163 new_entries.sort_unstable();
164 new_entries.dedup();
165
166 let mut sorted = self.sorted_vec.write();
168
169 if sorted.is_empty() {
170 *sorted = new_entries;
172 } else {
173 let old_sorted = std::mem::take(&mut *sorted);
175 *sorted = Self::merge_sorted_vecs(old_sorted, new_entries);
176 }
177
178 self.needs_compaction.store(false, Ordering::Release);
179 self.total_compactions.fetch_add(1, Ordering::Relaxed);
180 }
181
182 fn merge_sorted_vecs(a: Vec<Vec<u8>>, b: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
185 let mut result = Vec::with_capacity(a.len() + b.len());
186 let mut i = 0;
187 let mut j = 0;
188
189 while i < a.len() && j < b.len() {
190 match a[i].cmp(&b[j]) {
191 std::cmp::Ordering::Less => {
192 result.push(a[i].clone());
193 i += 1;
194 }
195 std::cmp::Ordering::Greater => {
196 result.push(b[j].clone());
197 j += 1;
198 }
199 std::cmp::Ordering::Equal => {
200 result.push(a[i].clone());
202 i += 1;
203 j += 1;
204 }
205 }
206 }
207
208 while i < a.len() {
210 result.push(a[i].clone());
211 i += 1;
212 }
213 while j < b.len() {
214 result.push(b[j].clone());
215 j += 1;
216 }
217
218 result
219 }
220
221 #[inline]
223 fn ensure_compacted(&self) {
224 if self.needs_compaction.load(Ordering::Acquire)
225 || !self.hot_buffer.read().is_empty()
226 {
227 self.compact();
228 }
229 }
230
231 pub fn range_from<'a>(
235 &'a self,
236 start: &[u8],
237 ) -> impl Iterator<Item = Vec<u8>> + 'a {
238 self.ensure_compacted();
239
240 let sorted = self.sorted_vec.read();
241 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
243
244 let result: Vec<Vec<u8>> = sorted[start_idx..].to_vec();
247 result.into_iter()
248 }
249
250 pub fn range<'a>(
254 &'a self,
255 start: &[u8],
256 end: &[u8],
257 ) -> impl Iterator<Item = Vec<u8>> + 'a {
258 self.ensure_compacted();
259
260 let sorted = self.sorted_vec.read();
261 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
263 let end_idx = sorted.partition_point(|k| k.as_slice() < end);
264
265 let result: Vec<Vec<u8>> = sorted[start_idx..end_idx].to_vec();
267 result.into_iter()
268 }
269
270 pub fn contains(&self, key: &[u8]) -> bool {
272 {
274 let buffer = self.hot_buffer.read();
275 if buffer.iter().any(|k| k.as_slice() == key) {
276 return true;
277 }
278 }
279
280 let sorted = self.sorted_vec.read();
282 sorted.binary_search_by(|k| k.as_slice().cmp(key)).is_ok()
283 }
284
285 pub fn stats(&self) -> DeferredIndexStats {
287 let buffer_len = self.hot_buffer.read().len();
288 let sorted_len = self.sorted_vec.read().len();
289 DeferredIndexStats {
290 sorted_entries: sorted_len,
291 hot_buffer_entries: buffer_len,
292 total_inserts: self.total_inserts.load(Ordering::Relaxed),
293 total_compactions: self.total_compactions.load(Ordering::Relaxed),
294 }
295 }
296
297 pub fn clear(&self) {
299 self.sorted_vec.write().clear();
300 self.hot_buffer.write().clear();
301 self.needs_compaction.store(false, Ordering::Release);
302 }
303
304 pub fn len(&self) -> usize {
306 self.ensure_compacted();
307 self.sorted_vec.read().len()
308 }
309
310 pub fn is_empty(&self) -> bool {
312 self.sorted_vec.read().is_empty() && self.hot_buffer.read().is_empty()
313 }
314}
315
316impl Default for DeferredSortedIndex {
317 fn default() -> Self {
318 Self::new()
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct DeferredIndexStats {
325 pub sorted_entries: usize,
327 pub hot_buffer_entries: usize,
329 pub total_inserts: u64,
331 pub total_compactions: u64,
333}
334
335impl DeferredIndexStats {
336 pub fn compaction_ratio(&self) -> f64 {
338 if self.total_compactions == 0 {
339 0.0
340 } else {
341 self.total_inserts as f64 / self.total_compactions as f64
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 #[test]
351 fn test_basic_insert_and_scan() {
352 let index = DeferredSortedIndex::new();
353
354 index.insert(b"key3".to_vec());
356 index.insert(b"key1".to_vec());
357 index.insert(b"key2".to_vec());
358
359 let keys: Vec<_> = index.range_from(b"").collect();
361 assert_eq!(keys, vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]);
362 }
363
364 #[test]
365 fn test_deferred_compaction() {
366 let config = DeferredIndexConfig {
367 max_unsorted_entries: 5,
368 enabled: true,
369 };
370 let index = DeferredSortedIndex::with_config(config);
371
372 for i in 0..4 {
374 index.insert(format!("key{}", i).into_bytes());
375 }
376
377 assert!(!index.needs_compaction.load(Ordering::Relaxed));
379 assert_eq!(index.sorted_vec.read().len(), 0);
380
381 index.insert(b"key4".to_vec());
383 assert!(index.needs_compaction.load(Ordering::Relaxed));
384
385 let keys: Vec<_> = index.range_from(b"").collect();
387 assert_eq!(keys.len(), 5);
388 assert!(!index.needs_compaction.load(Ordering::Relaxed));
389 }
390
391 #[test]
392 fn test_dedup_on_compaction() {
393 let index = DeferredSortedIndex::new();
394
395 index.insert(b"key1".to_vec());
397 index.insert(b"key1".to_vec());
398 index.insert(b"key2".to_vec());
399 index.insert(b"key1".to_vec());
400
401 index.compact();
403 let stats = index.stats();
404 assert_eq!(stats.sorted_entries, 2);
405 }
406
407 #[test]
408 fn test_range_scan() {
409 let index = DeferredSortedIndex::new();
410
411 for i in 0..10 {
412 index.insert(format!("key{:02}", i).into_bytes());
413 }
414
415 let keys: Vec<_> = index.range(b"key03", b"key07").collect();
417 assert_eq!(keys.len(), 4); }
419
420 #[test]
421 fn test_disabled_mode() {
422 let config = DeferredIndexConfig {
423 enabled: false,
424 ..Default::default()
425 };
426 let index = DeferredSortedIndex::with_config(config);
427
428 index.insert(b"key1".to_vec());
431 index.compact();
432 assert_eq!(index.sorted_vec.read().len(), 1);
433 }
434
435 #[test]
436 fn test_concurrent_inserts() {
437 use std::sync::Arc;
438 use std::thread;
439
440 let index = Arc::new(DeferredSortedIndex::new());
441 let mut handles = vec![];
442
443 for t in 0..4 {
444 let idx = index.clone();
445 handles.push(thread::spawn(move || {
446 for i in 0..100 {
447 idx.insert(format!("t{}-key{:03}", t, i).into_bytes());
448 }
449 }));
450 }
451
452 for h in handles {
453 h.join().unwrap();
454 }
455
456 index.compact();
458 assert_eq!(index.sorted_vec.read().len(), 400);
459 }
460
461 #[test]
462 fn test_stats() {
463 let index = DeferredSortedIndex::new();
464
465 for i in 0..100 {
466 index.insert(format!("key{}", i).into_bytes());
467 }
468
469 let stats = index.stats();
470 assert_eq!(stats.total_inserts, 100);
471 assert_eq!(stats.hot_buffer_entries, 100);
472 assert_eq!(stats.sorted_entries, 0);
473
474 index.compact();
475
476 let stats = index.stats();
477 assert_eq!(stats.total_compactions, 1);
478 assert_eq!(stats.hot_buffer_entries, 0);
479 assert_eq!(stats.sorted_entries, 100);
480 }
481}