sochdb_storage/
stratified_skiplist.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Stratified SkipList with Deferred LSM Promotion (Task 2)
16//!
17//! This module provides a probabilistic hot-key buffer with CAS-based insertion
18//! and batched promotion to the underlying LSM tree.
19//!
20//! ## Problem
21//!
22//! Every insert traverses the full memtable skiplist, even for frequently updated
23//! keys that will be overwritten soon. Hot keys cause repeated work.
24//!
25//! ## Solution
26//!
27//! Two-level structure:
28//! 1. **Hot Buffer:** Small, CAS-based skiplist for recent writes
29//! 2. **Cold Storage:** Full memtable (promoted in batches)
30//!
31//! Hot keys are absorbed by the buffer; only final values get promoted.
32//!
33//! ## Performance
34//!
35//! | Metric | Before | After |
36//! |--------|--------|-------|
37//! | Hot key update | O(log N) | O(log H) where H << N |
38//! | Promotion overhead | Per-op | Batched (amortized) |
39//! | Lock contention | High | CAS-based (lock-free) |
40
41use std::cmp::Ordering;
42use std::ptr;
43use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
44use std::sync::Arc;
45
46/// Maximum skiplist height
47const MAX_HEIGHT: usize = 16;
48
49/// Probability for height selection (1/4)
50const P: u32 = 4;
51
52/// Default hot buffer capacity before promotion
53const DEFAULT_HOT_CAPACITY: usize = 4096;
54
55// ============================================================================
56// Skip Node (Lock-Free Tower)
57// ============================================================================
58
59/// Tower of forward pointers for a skip node
60#[repr(C)]
61struct Tower<K, V> {
62    /// Height of this tower (1..=MAX_HEIGHT)
63    height: usize,
64    /// Forward pointers at each level
65    next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
66}
67
68impl<K, V> Tower<K, V> {
69    /// Create a new tower with the given height
70    fn new(height: usize) -> Self {
71        let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
72            std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
73        
74        // Initialize all pointers to null
75        for ptr in next.iter_mut().take(height) {
76            *ptr = AtomicPtr::new(ptr::null_mut());
77        }
78        
79        Self { height, next }
80    }
81    
82    /// Get the forward pointer at a level
83    #[inline]
84    fn get(&self, level: usize) -> *mut SkipNode<K, V> {
85        self.next[level].load(AtomicOrdering::Acquire)
86    }
87    
88    /// Set the forward pointer at a level
89    #[inline]
90    fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
91        self.next[level].store(node, AtomicOrdering::Release);
92    }
93    
94    /// CAS the forward pointer at a level
95    #[inline]
96    fn cas(
97        &self,
98        level: usize,
99        expected: *mut SkipNode<K, V>,
100        new: *mut SkipNode<K, V>,
101    ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
102        self.next[level]
103            .compare_exchange(expected, new, AtomicOrdering::AcqRel, AtomicOrdering::Acquire)
104    }
105}
106
107/// A node in the skip list
108#[repr(C)]
109struct SkipNode<K, V> {
110    /// The key
111    key: K,
112    /// The value (can be updated atomically)
113    value: AtomicPtr<V>,
114    /// Version counter for optimistic concurrency
115    version: AtomicU64,
116    /// Tower of forward pointers
117    tower: Tower<K, V>,
118}
119
120impl<K, V> SkipNode<K, V> {
121    /// Create a new skip node
122    fn new(key: K, value: V, height: usize) -> *mut Self {
123        let value_ptr = Box::into_raw(Box::new(value));
124        let node = Box::new(Self {
125            key,
126            value: AtomicPtr::new(value_ptr),
127            version: AtomicU64::new(1),
128            tower: Tower::new(height),
129        });
130        Box::into_raw(node)
131    }
132    
133    /// Get the current value
134    #[inline]
135    unsafe fn get_value(&self) -> &V {
136        unsafe { &*self.value.load(AtomicOrdering::Acquire) }
137    }
138    
139    /// Update the value (CAS-based)
140    #[inline]
141    unsafe fn update_value(&self, new_value: V) -> V {
142        let new_ptr = Box::into_raw(Box::new(new_value));
143        let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
144        self.version.fetch_add(1, AtomicOrdering::Release);
145        unsafe { *Box::from_raw(old_ptr) }
146    }
147}
148
149// ============================================================================
150// Stratified SkipList
151// ============================================================================
152
153/// Thread-local random state for height selection
154fn random_height() -> usize {
155    let mut height = 1;
156    // Use a simple PRNG based on pointer address and time
157    let mut state = (&height as *const _ as u64)
158        .wrapping_mul(0x9E3779B97F4A7C15)
159        .wrapping_add(
160            std::time::SystemTime::now()
161                .duration_since(std::time::UNIX_EPOCH)
162                .map(|d| d.as_nanos() as u64)
163                .unwrap_or(0),
164        );
165    
166    while height < MAX_HEIGHT {
167        state = state.wrapping_mul(1103515245).wrapping_add(12345);
168        if (state >> 17) % (P as u64) != 0 {
169            break;
170        }
171        height += 1;
172    }
173    height
174}
175
176/// Stratified skip list with lock-free operations
177pub struct StratifiedSkipList<K, V>
178where
179    K: Ord + Clone,
180    V: Clone,
181{
182    /// Head sentinel node
183    head: *mut SkipNode<K, V>,
184    /// Current maximum height
185    max_height: AtomicUsize,
186    /// Number of elements
187    len: AtomicUsize,
188    /// Capacity before promotion
189    capacity: usize,
190    /// Promotion callback
191    promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
192}
193
194impl<K, V> StratifiedSkipList<K, V>
195where
196    K: Ord + Clone + Default,
197    V: Clone + Default,
198{
199    /// Create a new stratified skip list
200    pub fn new() -> Self {
201        Self::with_capacity(DEFAULT_HOT_CAPACITY)
202    }
203    
204    /// Create with custom capacity
205    pub fn with_capacity(capacity: usize) -> Self {
206        // Create head sentinel with default values
207        let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
208        
209        Self {
210            head,
211            max_height: AtomicUsize::new(1),
212            len: AtomicUsize::new(0),
213            capacity,
214            promoter: None,
215        }
216    }
217    
218    /// Set the promotion callback
219    pub fn set_promoter<F>(&mut self, promoter: F)
220    where
221        F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
222    {
223        self.promoter = Some(Arc::new(promoter));
224    }
225    
226    /// Insert or update a key-value pair (CAS-based)
227    ///
228    /// Returns the old value if the key existed.
229    pub fn insert(&self, key: K, value: V) -> Option<V> {
230        // Check if we need to promote before inserting
231        if self.len() >= self.capacity {
232            self.try_promote();
233        }
234        
235        let height = random_height();
236        let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
237        let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
238        
239        loop {
240            // Find position
241            self.find_position(&key, &mut prev, &mut next);
242            
243            // Check if key exists
244            if !next[0].is_null() {
245                let existing = unsafe { &*next[0] };
246                if existing.key == key {
247                    // Update existing value
248                    let old = unsafe { existing.update_value(value.clone()) };
249                    return Some(old);
250                }
251            }
252            
253            // Create new node
254            let new_node = SkipNode::new(key.clone(), value.clone(), height);
255            
256            // Link at level 0 first
257            unsafe {
258                (*new_node).tower.set(0, next[0]);
259            }
260            
261            // CAS at level 0
262            let prev_ptr = if prev[0].is_null() { self.head } else { prev[0] };
263            match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
264                Ok(_) => {
265                    // Successfully inserted at level 0
266                    // Now link higher levels
267                    for level in 1..height {
268                        loop {
269                            unsafe {
270                                (*new_node).tower.set(level, next[level]);
271                            }
272                            
273                            let prev_at_level = if prev[level].is_null() { self.head } else { prev[level] };
274                            if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }.is_ok() {
275                                break;
276                            }
277                            
278                            // Re-find position at this level
279                            self.find_position(&key, &mut prev, &mut next);
280                        }
281                    }
282                    
283                    // Update max height
284                    loop {
285                        let current_max = self.max_height.load(AtomicOrdering::Relaxed);
286                        if height <= current_max {
287                            break;
288                        }
289                        if self.max_height
290                            .compare_exchange_weak(current_max, height, AtomicOrdering::Release, AtomicOrdering::Relaxed)
291                            .is_ok()
292                        {
293                            break;
294                        }
295                    }
296                    
297                    self.len.fetch_add(1, AtomicOrdering::Release);
298                    return None;
299                }
300                Err(_) => {
301                    // CAS failed, someone else inserted
302                    // Free the node we created and retry
303                    unsafe {
304                        let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
305                        drop(Box::from_raw(value_ptr));
306                        drop(Box::from_raw(new_node));
307                    }
308                    continue;
309                }
310            }
311        }
312    }
313    
314    /// Find the position for a key
315    fn find_position(
316        &self,
317        key: &K,
318        prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
319        next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
320    ) {
321        let max_height = self.max_height.load(AtomicOrdering::Acquire);
322        let mut current = self.head;
323        
324        for level in (0..max_height).rev() {
325            loop {
326                let next_node = unsafe { (*current).tower.get(level) };
327                if next_node.is_null() {
328                    break;
329                }
330                
331                let next_key = unsafe { &(*next_node).key };
332                match next_key.cmp(key) {
333                    Ordering::Less => {
334                        current = next_node;
335                    }
336                    Ordering::Equal | Ordering::Greater => {
337                        break;
338                    }
339                }
340            }
341            
342            prev[level] = if current == self.head { ptr::null_mut() } else { current };
343            next[level] = unsafe { (*current).tower.get(level) };
344        }
345    }
346    
347    /// Get a value by key
348    pub fn get(&self, key: &K) -> Option<V> {
349        let max_height = self.max_height.load(AtomicOrdering::Acquire);
350        let mut current = self.head;
351        
352        for level in (0..max_height).rev() {
353            loop {
354                let next_node = unsafe { (*current).tower.get(level) };
355                if next_node.is_null() {
356                    break;
357                }
358                
359                let next_key = unsafe { &(*next_node).key };
360                match next_key.cmp(key) {
361                    Ordering::Less => {
362                        current = next_node;
363                    }
364                    Ordering::Equal => {
365                        let value = unsafe { (*next_node).get_value().clone() };
366                        return Some(value);
367                    }
368                    Ordering::Greater => {
369                        break;
370                    }
371                }
372            }
373        }
374        
375        None
376    }
377    
378    /// Get the number of elements
379    #[inline]
380    pub fn len(&self) -> usize {
381        self.len.load(AtomicOrdering::Acquire)
382    }
383    
384    /// Check if empty
385    #[inline]
386    pub fn is_empty(&self) -> bool {
387        self.len() == 0
388    }
389    
390    /// Try to promote entries to the underlying storage
391    fn try_promote(&self) {
392        if let Some(ref promoter) = self.promoter {
393            let entries = self.drain();
394            if !entries.is_empty() {
395                promoter(entries);
396            }
397        }
398    }
399    
400    /// Drain all entries for promotion
401    pub fn drain(&self) -> Vec<(K, V)> {
402        let mut entries = Vec::with_capacity(self.len());
403        let mut current = unsafe { (*self.head).tower.get(0) };
404        
405        while !current.is_null() {
406            let node = unsafe { &*current };
407            let key = node.key.clone();
408            let value = unsafe { node.get_value().clone() };
409            entries.push((key, value));
410            current = node.tower.get(0);
411        }
412        
413        // Note: In production, we'd use proper marking for deletion
414        // For now, just reset the count (simplified)
415        self.len.store(0, AtomicOrdering::Release);
416        
417        entries
418    }
419    
420    /// Iterate over all entries (read-only)
421    pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
422        SkipListIter {
423            current: unsafe { (*self.head).tower.get(0) },
424            _marker: std::marker::PhantomData,
425        }
426    }
427}
428
429impl<K, V> Default for StratifiedSkipList<K, V>
430where
431    K: Ord + Clone + Default,
432    V: Clone + Default,
433{
434    fn default() -> Self {
435        Self::new()
436    }
437}
438
439impl<K, V> Drop for StratifiedSkipList<K, V>
440where
441    K: Ord + Clone,
442    V: Clone,
443{
444    fn drop(&mut self) {
445        let mut current = unsafe { (*self.head).tower.get(0) };
446        
447        while !current.is_null() {
448            let next = unsafe { (*current).tower.get(0) };
449            unsafe {
450                let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
451                drop(Box::from_raw(value_ptr));
452                drop(Box::from_raw(current));
453            }
454            current = next;
455        }
456        
457        // Free head
458        unsafe {
459            let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
460            drop(Box::from_raw(value_ptr));
461            drop(Box::from_raw(self.head));
462        }
463    }
464}
465
466// Safety: StratifiedSkipList uses atomic operations for all mutations
467unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send for StratifiedSkipList<K, V> {}
468unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync for StratifiedSkipList<K, V> {}
469
470/// Iterator over skip list entries
471struct SkipListIter<'a, K, V> {
472    current: *mut SkipNode<K, V>,
473    _marker: std::marker::PhantomData<&'a ()>,
474}
475
476impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
477    type Item = (K, V);
478    
479    fn next(&mut self) -> Option<Self::Item> {
480        if self.current.is_null() {
481            return None;
482        }
483        
484        let node = unsafe { &*self.current };
485        let key = node.key.clone();
486        let value = unsafe { node.get_value().clone() };
487        self.current = node.tower.get(0);
488        
489        Some((key, value))
490    }
491}
492
493// ============================================================================
494// Batch Promoter
495// ============================================================================
496
497/// Statistics for promotion
498#[derive(Debug, Clone, Default)]
499pub struct PromotionStats {
500    /// Number of promotions
501    pub promotion_count: u64,
502    /// Total entries promoted
503    pub entries_promoted: u64,
504    /// Average batch size
505    pub avg_batch_size: f64,
506}
507
508/// Batch promoter for deferred LSM promotion
509pub struct BatchPromoter<K, V>
510where
511    K: Ord + Clone + Default,
512    V: Clone + Default,
513{
514    /// Hot buffer (stratified skip list)
515    hot_buffer: StratifiedSkipList<K, V>,
516    /// Pending promotion batches
517    #[allow(dead_code)]
518    pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
519    /// Statistics
520    stats: std::sync::Mutex<PromotionStats>,
521    /// Background promoter thread handle
522    _background: Option<std::thread::JoinHandle<()>>,
523}
524
525impl<K, V> BatchPromoter<K, V>
526where
527    K: Ord + Clone + Default + Send + Sync + 'static,
528    V: Clone + Default + Send + Sync + 'static,
529{
530    /// Create a new batch promoter
531    pub fn new(hot_capacity: usize) -> Arc<Self> {
532        let promoter = Arc::new(Self {
533            hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
534            pending: std::sync::Mutex::new(Vec::new()),
535            stats: std::sync::Mutex::new(PromotionStats::default()),
536            _background: None,
537        });
538        
539        promoter
540    }
541    
542    /// Insert a hot key
543    pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
544        self.hot_buffer.insert(key, value)
545    }
546    
547    /// Get a value (checks hot buffer first)
548    pub fn get(&self, key: &K) -> Option<V> {
549        self.hot_buffer.get(key)
550    }
551    
552    /// Force promotion of all hot entries
553    pub fn force_promote(&self) -> Vec<(K, V)> {
554        let entries = self.hot_buffer.drain();
555        
556        if !entries.is_empty() {
557            let mut stats = self.stats.lock().unwrap();
558            stats.promotion_count += 1;
559            stats.entries_promoted += entries.len() as u64;
560            stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
561        }
562        
563        entries
564    }
565    
566    /// Get statistics
567    pub fn stats(&self) -> PromotionStats {
568        self.stats.lock().unwrap().clone()
569    }
570    
571    /// Get hot buffer size
572    pub fn hot_size(&self) -> usize {
573        self.hot_buffer.len()
574    }
575}
576
577// ============================================================================
578// Deferred Index (combines hot buffer with cold storage)
579// ============================================================================
580
581/// Deferred index with stratified storage
582pub struct DeferredIndex<K, V, Cold>
583where
584    K: Ord + Clone + Default,
585    V: Clone + Default,
586{
587    /// Hot tier (stratified skip list)
588    hot: StratifiedSkipList<K, V>,
589    /// Cold tier (underlying storage)
590    cold: Cold,
591    /// Promotion threshold
592    promotion_threshold: usize,
593    /// Insert count since last promotion
594    insert_count: AtomicUsize,
595}
596
597impl<K, V, Cold> DeferredIndex<K, V, Cold>
598where
599    K: Ord + Clone + Default,
600    V: Clone + Default,
601    Cold: ColdStorage<K, V>,
602{
603    /// Create a new deferred index
604    pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
605        Self {
606            hot: StratifiedSkipList::with_capacity(promotion_threshold),
607            cold,
608            promotion_threshold,
609            insert_count: AtomicUsize::new(0),
610        }
611    }
612    
613    /// Insert a key-value pair
614    pub fn insert(&self, key: K, value: V) -> Option<V> {
615        let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
616        
617        // Check if we need to promote
618        if count >= self.promotion_threshold {
619            self.promote();
620        }
621        
622        self.hot.insert(key, value)
623    }
624    
625    /// Get a value (hot tier first, then cold)
626    pub fn get(&self, key: &K) -> Option<V> {
627        // Check hot tier first
628        if let Some(value) = self.hot.get(key) {
629            return Some(value);
630        }
631        
632        // Fall back to cold tier
633        self.cold.get(key)
634    }
635    
636    /// Promote hot entries to cold storage
637    pub fn promote(&self) {
638        let entries = self.hot.drain();
639        
640        if !entries.is_empty() {
641            self.cold.insert_batch(entries);
642            self.insert_count.store(0, AtomicOrdering::Release);
643        }
644    }
645    
646    /// Get hot tier size
647    pub fn hot_size(&self) -> usize {
648        self.hot.len()
649    }
650}
651
652/// Trait for cold storage backends
653pub trait ColdStorage<K, V>: Send + Sync {
654    /// Get a value
655    fn get(&self, key: &K) -> Option<V>;
656    
657    /// Insert a batch of entries
658    fn insert_batch(&self, entries: Vec<(K, V)>);
659}
660
661// ============================================================================
662// Simple In-Memory Cold Storage (for testing)
663// ============================================================================
664
665/// Simple hashmap-based cold storage
666pub struct HashMapCold<K, V> {
667    data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
668}
669
670impl<K, V> HashMapCold<K, V>
671where
672    K: Eq + std::hash::Hash + Clone,
673{
674    /// Create new cold storage
675    pub fn new() -> Self {
676        Self {
677            data: parking_lot::RwLock::new(std::collections::HashMap::new()),
678        }
679    }
680}
681
682impl<K, V> Default for HashMapCold<K, V>
683where
684    K: Eq + std::hash::Hash + Clone,
685{
686    fn default() -> Self {
687        Self::new()
688    }
689}
690
691impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
692where
693    K: Eq + std::hash::Hash + Clone + Send + Sync,
694    V: Clone + Send + Sync,
695{
696    fn get(&self, key: &K) -> Option<V> {
697        self.data.read().get(key).cloned()
698    }
699    
700    fn insert_batch(&self, entries: Vec<(K, V)>) {
701        let mut data = self.data.write();
702        for (k, v) in entries {
703            data.insert(k, v);
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711    use std::sync::Arc;
712    use std::thread;
713    
714    #[test]
715    fn test_skiplist_basic() {
716        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
717        
718        assert!(list.insert(1, "one".to_string()).is_none());
719        assert!(list.insert(2, "two".to_string()).is_none());
720        assert!(list.insert(3, "three".to_string()).is_none());
721        
722        assert_eq!(list.len(), 3);
723        assert_eq!(list.get(&1), Some("one".to_string()));
724        assert_eq!(list.get(&2), Some("two".to_string()));
725        assert_eq!(list.get(&3), Some("three".to_string()));
726        assert_eq!(list.get(&4), None);
727    }
728    
729    #[test]
730    fn test_skiplist_update() {
731        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
732        
733        assert!(list.insert(1, "one".to_string()).is_none());
734        assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
735        
736        assert_eq!(list.len(), 1);
737        assert_eq!(list.get(&1), Some("ONE".to_string()));
738    }
739    
740    #[test]
741    fn test_skiplist_concurrent() {
742        let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
743        let mut handles = vec![];
744        
745        for t in 0..4 {
746            let list_clone = list.clone();
747            handles.push(thread::spawn(move || {
748                for i in 0..1000 {
749                    let key = t * 1000 + i;
750                    list_clone.insert(key, key * 2);
751                }
752            }));
753        }
754        
755        for handle in handles {
756            handle.join().unwrap();
757        }
758        
759        assert_eq!(list.len(), 4000);
760        
761        // Verify some values
762        assert_eq!(list.get(&0), Some(0));
763        assert_eq!(list.get(&1000), Some(2000));
764        assert_eq!(list.get(&2000), Some(4000));
765    }
766    
767    #[test]
768    fn test_skiplist_drain() {
769        let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
770        
771        for i in 0..100 {
772            list.insert(i, i * 2);
773        }
774        
775        let entries = list.drain();
776        assert_eq!(entries.len(), 100);
777        
778        // Should be sorted
779        for (i, (k, v)) in entries.iter().enumerate() {
780            assert_eq!(*k, i as i32);
781            assert_eq!(*v, (i * 2) as i32);
782        }
783    }
784    
785    #[test]
786    fn test_batch_promoter() {
787        let promoter = BatchPromoter::<i32, i32>::new(100);
788        
789        for i in 0..50 {
790            promoter.insert_hot(i, i * 2);
791        }
792        
793        assert_eq!(promoter.hot_size(), 50);
794        assert_eq!(promoter.get(&10), Some(20));
795        
796        let promoted = promoter.force_promote();
797        assert_eq!(promoted.len(), 50);
798        
799        let stats = promoter.stats();
800        assert_eq!(stats.promotion_count, 1);
801        assert_eq!(stats.entries_promoted, 50);
802    }
803    
804    #[test]
805    fn test_deferred_index() {
806        let cold = HashMapCold::<i32, i32>::new();
807        let index = DeferredIndex::new(cold, 10);
808        
809        // Insert hot keys
810        for i in 0..5 {
811            index.insert(i, i * 10);
812        }
813        
814        // Get from hot tier
815        assert_eq!(index.get(&3), Some(30));
816        
817        // Insert more to trigger promotion
818        for i in 5..15 {
819            index.insert(i, i * 10);
820        }
821        
822        // After promotion, should still find values
823        assert_eq!(index.get(&0), Some(0)); // Was promoted
824        assert_eq!(index.get(&12), Some(120)); // Was promoted or still hot
825    }
826    
827    #[test]
828    fn test_hot_key_absorption() {
829        let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
830        
831        // Simulate hot key pattern
832        for _ in 0..100 {
833            list.insert("hot_key".to_string(), 42);
834        }
835        
836        // Should only have one entry (absorbed)
837        assert_eq!(list.len(), 1);
838        assert_eq!(list.get(&"hot_key".to_string()), Some(42));
839    }
840}