sochdb_storage/
deferred_index.rs1use parking_lot::RwLock;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54
55#[derive(Clone, Debug)]
57pub struct DeferredIndexConfig {
58 pub max_unsorted_entries: usize,
61 pub enabled: bool,
63}
64
65impl Default for DeferredIndexConfig {
66 fn default() -> Self {
67 Self {
68 max_unsorted_entries: 10_000,
69 enabled: true,
70 }
71 }
72}
73
74pub struct DeferredSortedIndex {
90 config: DeferredIndexConfig,
92 sorted_vec: RwLock<Vec<Vec<u8>>>,
95 hot_buffer: RwLock<Vec<Vec<u8>>>,
97 needs_compaction: AtomicBool,
99 total_inserts: AtomicU64,
101 total_compactions: AtomicU64,
103}
104
105impl DeferredSortedIndex {
106 pub fn new() -> Self {
108 Self::with_config(DeferredIndexConfig::default())
109 }
110
111 pub fn with_config(config: DeferredIndexConfig) -> Self {
113 Self {
114 config,
115 sorted_vec: RwLock::new(Vec::new()),
116 hot_buffer: RwLock::new(Vec::with_capacity(1000)),
117 needs_compaction: AtomicBool::new(false),
118 total_inserts: AtomicU64::new(0),
119 total_compactions: AtomicU64::new(0),
120 }
121 }
122
123 #[inline]
127 pub fn insert(&self, key: Vec<u8>) {
128 self.total_inserts.fetch_add(1, Ordering::Relaxed);
129
130 {
132 let mut buffer = self.hot_buffer.write();
133 buffer.push(key);
134
135 if buffer.len() >= self.config.max_unsorted_entries {
137 self.needs_compaction.store(true, Ordering::Release);
138 }
139 }
140 }
141
142 #[inline]
144 pub fn insert_ref(&self, key: &[u8]) {
145 self.insert(key.to_vec());
146 }
147
148 pub fn compact(&self) {
153 let entries_to_merge = {
154 let mut buffer = self.hot_buffer.write();
155 if buffer.is_empty() {
156 return;
157 }
158 std::mem::take(&mut *buffer)
159 };
160
161 let mut new_entries = entries_to_merge;
163 new_entries.sort_unstable();
164 new_entries.dedup();
165
166 let mut sorted = self.sorted_vec.write();
168
169 if sorted.is_empty() {
170 *sorted = new_entries;
172 } else {
173 let old_sorted = std::mem::take(&mut *sorted);
175 *sorted = Self::merge_sorted_vecs(old_sorted, new_entries);
176 }
177
178 self.needs_compaction.store(false, Ordering::Release);
179 self.total_compactions.fetch_add(1, Ordering::Relaxed);
180 }
181
182 fn merge_sorted_vecs(a: Vec<Vec<u8>>, b: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
185 let mut result = Vec::with_capacity(a.len() + b.len());
186 let mut i = 0;
187 let mut j = 0;
188
189 while i < a.len() && j < b.len() {
190 match a[i].cmp(&b[j]) {
191 std::cmp::Ordering::Less => {
192 result.push(a[i].clone());
193 i += 1;
194 }
195 std::cmp::Ordering::Greater => {
196 result.push(b[j].clone());
197 j += 1;
198 }
199 std::cmp::Ordering::Equal => {
200 result.push(a[i].clone());
202 i += 1;
203 j += 1;
204 }
205 }
206 }
207
208 while i < a.len() {
210 result.push(a[i].clone());
211 i += 1;
212 }
213 while j < b.len() {
214 result.push(b[j].clone());
215 j += 1;
216 }
217
218 result
219 }
220
221 #[inline]
223 fn ensure_compacted(&self) {
224 if self.needs_compaction.load(Ordering::Acquire) || !self.hot_buffer.read().is_empty() {
225 self.compact();
226 }
227 }
228
229 pub fn range_from<'a>(&'a self, start: &[u8]) -> impl Iterator<Item = Vec<u8>> + 'a {
233 self.ensure_compacted();
234
235 let sorted = self.sorted_vec.read();
236 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
238
239 let result: Vec<Vec<u8>> = sorted[start_idx..].to_vec();
242 result.into_iter()
243 }
244
245 pub fn range<'a>(&'a self, start: &[u8], end: &[u8]) -> impl Iterator<Item = Vec<u8>> + 'a {
249 self.ensure_compacted();
250
251 let sorted = self.sorted_vec.read();
252 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
254 let end_idx = sorted.partition_point(|k| k.as_slice() < end);
255
256 let result: Vec<Vec<u8>> = sorted[start_idx..end_idx].to_vec();
258 result.into_iter()
259 }
260
261 pub fn contains(&self, key: &[u8]) -> bool {
263 {
265 let buffer = self.hot_buffer.read();
266 if buffer.iter().any(|k| k.as_slice() == key) {
267 return true;
268 }
269 }
270
271 let sorted = self.sorted_vec.read();
273 sorted.binary_search_by(|k| k.as_slice().cmp(key)).is_ok()
274 }
275
276 pub fn stats(&self) -> DeferredIndexStats {
278 let buffer_len = self.hot_buffer.read().len();
279 let sorted_len = self.sorted_vec.read().len();
280 DeferredIndexStats {
281 sorted_entries: sorted_len,
282 hot_buffer_entries: buffer_len,
283 total_inserts: self.total_inserts.load(Ordering::Relaxed),
284 total_compactions: self.total_compactions.load(Ordering::Relaxed),
285 }
286 }
287
288 pub fn clear(&self) {
290 self.sorted_vec.write().clear();
291 self.hot_buffer.write().clear();
292 self.needs_compaction.store(false, Ordering::Release);
293 }
294
295 pub fn len(&self) -> usize {
297 self.ensure_compacted();
298 self.sorted_vec.read().len()
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.sorted_vec.read().is_empty() && self.hot_buffer.read().is_empty()
304 }
305}
306
307impl Default for DeferredSortedIndex {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[derive(Debug, Clone)]
315pub struct DeferredIndexStats {
316 pub sorted_entries: usize,
318 pub hot_buffer_entries: usize,
320 pub total_inserts: u64,
322 pub total_compactions: u64,
324}
325
326impl DeferredIndexStats {
327 pub fn compaction_ratio(&self) -> f64 {
329 if self.total_compactions == 0 {
330 0.0
331 } else {
332 self.total_inserts as f64 / self.total_compactions as f64
333 }
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_basic_insert_and_scan() {
343 let index = DeferredSortedIndex::new();
344
345 index.insert(b"key3".to_vec());
347 index.insert(b"key1".to_vec());
348 index.insert(b"key2".to_vec());
349
350 let keys: Vec<_> = index.range_from(b"").collect();
352 assert_eq!(
353 keys,
354 vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]
355 );
356 }
357
358 #[test]
359 fn test_deferred_compaction() {
360 let config = DeferredIndexConfig {
361 max_unsorted_entries: 5,
362 enabled: true,
363 };
364 let index = DeferredSortedIndex::with_config(config);
365
366 for i in 0..4 {
368 index.insert(format!("key{}", i).into_bytes());
369 }
370
371 assert!(!index.needs_compaction.load(Ordering::Relaxed));
373 assert_eq!(index.sorted_vec.read().len(), 0);
374
375 index.insert(b"key4".to_vec());
377 assert!(index.needs_compaction.load(Ordering::Relaxed));
378
379 let keys: Vec<_> = index.range_from(b"").collect();
381 assert_eq!(keys.len(), 5);
382 assert!(!index.needs_compaction.load(Ordering::Relaxed));
383 }
384
385 #[test]
386 fn test_dedup_on_compaction() {
387 let index = DeferredSortedIndex::new();
388
389 index.insert(b"key1".to_vec());
391 index.insert(b"key1".to_vec());
392 index.insert(b"key2".to_vec());
393 index.insert(b"key1".to_vec());
394
395 index.compact();
397 let stats = index.stats();
398 assert_eq!(stats.sorted_entries, 2);
399 }
400
401 #[test]
402 fn test_range_scan() {
403 let index = DeferredSortedIndex::new();
404
405 for i in 0..10 {
406 index.insert(format!("key{:02}", i).into_bytes());
407 }
408
409 let keys: Vec<_> = index.range(b"key03", b"key07").collect();
411 assert_eq!(keys.len(), 4); }
413
414 #[test]
415 fn test_disabled_mode() {
416 let config = DeferredIndexConfig {
417 enabled: false,
418 ..Default::default()
419 };
420 let index = DeferredSortedIndex::with_config(config);
421
422 index.insert(b"key1".to_vec());
425 index.compact();
426 assert_eq!(index.sorted_vec.read().len(), 1);
427 }
428
429 #[test]
430 fn test_concurrent_inserts() {
431 use std::sync::Arc;
432 use std::thread;
433
434 let index = Arc::new(DeferredSortedIndex::new());
435 let mut handles = vec![];
436
437 for t in 0..4 {
438 let idx = index.clone();
439 handles.push(thread::spawn(move || {
440 for i in 0..100 {
441 idx.insert(format!("t{}-key{:03}", t, i).into_bytes());
442 }
443 }));
444 }
445
446 for h in handles {
447 h.join().unwrap();
448 }
449
450 index.compact();
452 assert_eq!(index.sorted_vec.read().len(), 400);
453 }
454
455 #[test]
456 fn test_stats() {
457 let index = DeferredSortedIndex::new();
458
459 for i in 0..100 {
460 index.insert(format!("key{}", i).into_bytes());
461 }
462
463 let stats = index.stats();
464 assert_eq!(stats.total_inserts, 100);
465 assert_eq!(stats.hot_buffer_entries, 100);
466 assert_eq!(stats.sorted_entries, 0);
467
468 index.compact();
469
470 let stats = index.stats();
471 assert_eq!(stats.total_compactions, 1);
472 assert_eq!(stats.hot_buffer_entries, 0);
473 assert_eq!(stats.sorted_entries, 100);
474 }
475}