sochdb_storage/
deferred_index.rs1use parking_lot::RwLock;
50use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
51
52#[derive(Clone, Debug)]
54pub struct DeferredIndexConfig {
55 pub max_unsorted_entries: usize,
58 pub enabled: bool,
60}
61
62impl Default for DeferredIndexConfig {
63 fn default() -> Self {
64 Self {
65 max_unsorted_entries: 10_000,
66 enabled: true,
67 }
68 }
69}
70
71pub struct DeferredSortedIndex {
87 config: DeferredIndexConfig,
89 sorted_vec: RwLock<Vec<Vec<u8>>>,
92 hot_buffer: RwLock<Vec<Vec<u8>>>,
94 needs_compaction: AtomicBool,
96 total_inserts: AtomicU64,
98 total_compactions: AtomicU64,
100}
101
102impl DeferredSortedIndex {
103 pub fn new() -> Self {
105 Self::with_config(DeferredIndexConfig::default())
106 }
107
108 pub fn with_config(config: DeferredIndexConfig) -> Self {
110 Self {
111 config,
112 sorted_vec: RwLock::new(Vec::new()),
113 hot_buffer: RwLock::new(Vec::with_capacity(1000)),
114 needs_compaction: AtomicBool::new(false),
115 total_inserts: AtomicU64::new(0),
116 total_compactions: AtomicU64::new(0),
117 }
118 }
119
120 #[inline]
124 pub fn insert(&self, key: Vec<u8>) {
125 self.total_inserts.fetch_add(1, Ordering::Relaxed);
126
127 {
129 let mut buffer = self.hot_buffer.write();
130 buffer.push(key);
131
132 if buffer.len() >= self.config.max_unsorted_entries {
134 self.needs_compaction.store(true, Ordering::Release);
135 }
136 }
137 }
138
139 #[inline]
141 pub fn insert_ref(&self, key: &[u8]) {
142 self.insert(key.to_vec());
143 }
144
145 pub fn compact(&self) {
150 let entries_to_merge = {
151 let mut buffer = self.hot_buffer.write();
152 if buffer.is_empty() {
153 return;
154 }
155 std::mem::take(&mut *buffer)
156 };
157
158 let mut new_entries = entries_to_merge;
160 new_entries.sort_unstable();
161 new_entries.dedup();
162
163 let mut sorted = self.sorted_vec.write();
165
166 if sorted.is_empty() {
167 *sorted = new_entries;
169 } else {
170 let old_sorted = std::mem::take(&mut *sorted);
172 *sorted = Self::merge_sorted_vecs(old_sorted, new_entries);
173 }
174
175 self.needs_compaction.store(false, Ordering::Release);
176 self.total_compactions.fetch_add(1, Ordering::Relaxed);
177 }
178
179 fn merge_sorted_vecs(a: Vec<Vec<u8>>, b: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
182 let mut result = Vec::with_capacity(a.len() + b.len());
183 let mut i = 0;
184 let mut j = 0;
185
186 while i < a.len() && j < b.len() {
187 match a[i].cmp(&b[j]) {
188 std::cmp::Ordering::Less => {
189 result.push(a[i].clone());
190 i += 1;
191 }
192 std::cmp::Ordering::Greater => {
193 result.push(b[j].clone());
194 j += 1;
195 }
196 std::cmp::Ordering::Equal => {
197 result.push(a[i].clone());
199 i += 1;
200 j += 1;
201 }
202 }
203 }
204
205 while i < a.len() {
207 result.push(a[i].clone());
208 i += 1;
209 }
210 while j < b.len() {
211 result.push(b[j].clone());
212 j += 1;
213 }
214
215 result
216 }
217
218 #[inline]
220 fn ensure_compacted(&self) {
221 if self.needs_compaction.load(Ordering::Acquire)
222 || !self.hot_buffer.read().is_empty()
223 {
224 self.compact();
225 }
226 }
227
228 pub fn range_from<'a>(
232 &'a self,
233 start: &[u8],
234 ) -> impl Iterator<Item = Vec<u8>> + 'a {
235 self.ensure_compacted();
236
237 let sorted = self.sorted_vec.read();
238 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
240
241 let result: Vec<Vec<u8>> = sorted[start_idx..].to_vec();
244 result.into_iter()
245 }
246
247 pub fn range<'a>(
251 &'a self,
252 start: &[u8],
253 end: &[u8],
254 ) -> impl Iterator<Item = Vec<u8>> + 'a {
255 self.ensure_compacted();
256
257 let sorted = self.sorted_vec.read();
258 let start_idx = sorted.partition_point(|k| k.as_slice() < start);
260 let end_idx = sorted.partition_point(|k| k.as_slice() < end);
261
262 let result: Vec<Vec<u8>> = sorted[start_idx..end_idx].to_vec();
264 result.into_iter()
265 }
266
267 pub fn contains(&self, key: &[u8]) -> bool {
269 {
271 let buffer = self.hot_buffer.read();
272 if buffer.iter().any(|k| k.as_slice() == key) {
273 return true;
274 }
275 }
276
277 let sorted = self.sorted_vec.read();
279 sorted.binary_search_by(|k| k.as_slice().cmp(key)).is_ok()
280 }
281
282 pub fn stats(&self) -> DeferredIndexStats {
284 let buffer_len = self.hot_buffer.read().len();
285 let sorted_len = self.sorted_vec.read().len();
286 DeferredIndexStats {
287 sorted_entries: sorted_len,
288 hot_buffer_entries: buffer_len,
289 total_inserts: self.total_inserts.load(Ordering::Relaxed),
290 total_compactions: self.total_compactions.load(Ordering::Relaxed),
291 }
292 }
293
294 pub fn clear(&self) {
296 self.sorted_vec.write().clear();
297 self.hot_buffer.write().clear();
298 self.needs_compaction.store(false, Ordering::Release);
299 }
300
301 pub fn len(&self) -> usize {
303 self.ensure_compacted();
304 self.sorted_vec.read().len()
305 }
306
307 pub fn is_empty(&self) -> bool {
309 self.sorted_vec.read().is_empty() && self.hot_buffer.read().is_empty()
310 }
311}
312
313impl Default for DeferredSortedIndex {
314 fn default() -> Self {
315 Self::new()
316 }
317}
318
319#[derive(Debug, Clone)]
321pub struct DeferredIndexStats {
322 pub sorted_entries: usize,
324 pub hot_buffer_entries: usize,
326 pub total_inserts: u64,
328 pub total_compactions: u64,
330}
331
332impl DeferredIndexStats {
333 pub fn compaction_ratio(&self) -> f64 {
335 if self.total_compactions == 0 {
336 0.0
337 } else {
338 self.total_inserts as f64 / self.total_compactions as f64
339 }
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[test]
348 fn test_basic_insert_and_scan() {
349 let index = DeferredSortedIndex::new();
350
351 index.insert(b"key3".to_vec());
353 index.insert(b"key1".to_vec());
354 index.insert(b"key2".to_vec());
355
356 let keys: Vec<_> = index.range_from(b"").collect();
358 assert_eq!(keys, vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]);
359 }
360
361 #[test]
362 fn test_deferred_compaction() {
363 let config = DeferredIndexConfig {
364 max_unsorted_entries: 5,
365 enabled: true,
366 };
367 let index = DeferredSortedIndex::with_config(config);
368
369 for i in 0..4 {
371 index.insert(format!("key{}", i).into_bytes());
372 }
373
374 assert!(!index.needs_compaction.load(Ordering::Relaxed));
376 assert_eq!(index.sorted_vec.read().len(), 0);
377
378 index.insert(b"key4".to_vec());
380 assert!(index.needs_compaction.load(Ordering::Relaxed));
381
382 let keys: Vec<_> = index.range_from(b"").collect();
384 assert_eq!(keys.len(), 5);
385 assert!(!index.needs_compaction.load(Ordering::Relaxed));
386 }
387
388 #[test]
389 fn test_dedup_on_compaction() {
390 let index = DeferredSortedIndex::new();
391
392 index.insert(b"key1".to_vec());
394 index.insert(b"key1".to_vec());
395 index.insert(b"key2".to_vec());
396 index.insert(b"key1".to_vec());
397
398 index.compact();
400 let stats = index.stats();
401 assert_eq!(stats.sorted_entries, 2);
402 }
403
404 #[test]
405 fn test_range_scan() {
406 let index = DeferredSortedIndex::new();
407
408 for i in 0..10 {
409 index.insert(format!("key{:02}", i).into_bytes());
410 }
411
412 let keys: Vec<_> = index.range(b"key03", b"key07").collect();
414 assert_eq!(keys.len(), 4); }
416
417 #[test]
418 fn test_disabled_mode() {
419 let config = DeferredIndexConfig {
420 enabled: false,
421 ..Default::default()
422 };
423 let index = DeferredSortedIndex::with_config(config);
424
425 index.insert(b"key1".to_vec());
428 index.compact();
429 assert_eq!(index.sorted_vec.read().len(), 1);
430 }
431
432 #[test]
433 fn test_concurrent_inserts() {
434 use std::sync::Arc;
435 use std::thread;
436
437 let index = Arc::new(DeferredSortedIndex::new());
438 let mut handles = vec![];
439
440 for t in 0..4 {
441 let idx = index.clone();
442 handles.push(thread::spawn(move || {
443 for i in 0..100 {
444 idx.insert(format!("t{}-key{:03}", t, i).into_bytes());
445 }
446 }));
447 }
448
449 for h in handles {
450 h.join().unwrap();
451 }
452
453 index.compact();
455 assert_eq!(index.sorted_vec.read().len(), 400);
456 }
457
458 #[test]
459 fn test_stats() {
460 let index = DeferredSortedIndex::new();
461
462 for i in 0..100 {
463 index.insert(format!("key{}", i).into_bytes());
464 }
465
466 let stats = index.stats();
467 assert_eq!(stats.total_inserts, 100);
468 assert_eq!(stats.hot_buffer_entries, 100);
469 assert_eq!(stats.sorted_entries, 0);
470
471 index.compact();
472
473 let stats = index.stats();
474 assert_eq!(stats.total_compactions, 1);
475 assert_eq!(stats.hot_buffer_entries, 0);
476 assert_eq!(stats.sorted_entries, 100);
477 }
478}