Skip to main content

sochdb_storage/
stratified_skiplist.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Stratified SkipList with Deferred LSM Promotion (Task 2)
19//!
20//! This module provides a probabilistic hot-key buffer with CAS-based insertion
21//! and batched promotion to the underlying LSM tree.
22//!
23//! ## Problem
24//!
25//! Every insert traverses the full memtable skiplist, even for frequently updated
26//! keys that will be overwritten soon. Hot keys cause repeated work.
27//!
28//! ## Solution
29//!
30//! Two-level structure:
31//! 1. **Hot Buffer:** Small, CAS-based skiplist for recent writes
32//! 2. **Cold Storage:** Full memtable (promoted in batches)
33//!
34//! Hot keys are absorbed by the buffer; only final values get promoted.
35//!
36//! ## Performance
37//!
38//! | Metric | Before | After |
39//! |--------|--------|-------|
40//! | Hot key update | O(log N) | O(log H) where H << N |
41//! | Promotion overhead | Per-op | Batched (amortized) |
42//! | Lock contention | High | CAS-based (lock-free) |
43
44use std::cmp::Ordering;
45use std::ptr;
46use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
47use std::sync::Arc;
48
49/// Maximum skiplist height
50const MAX_HEIGHT: usize = 16;
51
52/// Probability for height selection (1/4)
53const P: u32 = 4;
54
55/// Default hot buffer capacity before promotion
56const DEFAULT_HOT_CAPACITY: usize = 4096;
57
58// ============================================================================
59// Skip Node (Lock-Free Tower)
60// ============================================================================
61
62/// Tower of forward pointers for a skip node
63#[repr(C)]
64struct Tower<K, V> {
65    /// Height of this tower (1..=MAX_HEIGHT)
66    height: usize,
67    /// Forward pointers at each level
68    next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
69}
70
71impl<K, V> Tower<K, V> {
72    /// Create a new tower with the given height
73    fn new(height: usize) -> Self {
74        let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
75            std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
76        
77        // Initialize all pointers to null
78        for ptr in next.iter_mut().take(height) {
79            *ptr = AtomicPtr::new(ptr::null_mut());
80        }
81        
82        Self { height, next }
83    }
84    
85    /// Get the forward pointer at a level
86    #[inline]
87    fn get(&self, level: usize) -> *mut SkipNode<K, V> {
88        self.next[level].load(AtomicOrdering::Acquire)
89    }
90    
91    /// Set the forward pointer at a level
92    #[inline]
93    fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
94        self.next[level].store(node, AtomicOrdering::Release);
95    }
96    
97    /// CAS the forward pointer at a level
98    #[inline]
99    fn cas(
100        &self,
101        level: usize,
102        expected: *mut SkipNode<K, V>,
103        new: *mut SkipNode<K, V>,
104    ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
105        self.next[level]
106            .compare_exchange(expected, new, AtomicOrdering::AcqRel, AtomicOrdering::Acquire)
107    }
108}
109
110/// A node in the skip list
111#[repr(C)]
112struct SkipNode<K, V> {
113    /// The key
114    key: K,
115    /// The value (can be updated atomically)
116    value: AtomicPtr<V>,
117    /// Version counter for optimistic concurrency
118    version: AtomicU64,
119    /// Tower of forward pointers
120    tower: Tower<K, V>,
121}
122
123impl<K, V> SkipNode<K, V> {
124    /// Create a new skip node
125    fn new(key: K, value: V, height: usize) -> *mut Self {
126        let value_ptr = Box::into_raw(Box::new(value));
127        let node = Box::new(Self {
128            key,
129            value: AtomicPtr::new(value_ptr),
130            version: AtomicU64::new(1),
131            tower: Tower::new(height),
132        });
133        Box::into_raw(node)
134    }
135    
136    /// Get the current value
137    #[inline]
138    unsafe fn get_value(&self) -> &V {
139        unsafe { &*self.value.load(AtomicOrdering::Acquire) }
140    }
141    
142    /// Update the value (CAS-based)
143    #[inline]
144    unsafe fn update_value(&self, new_value: V) -> V {
145        let new_ptr = Box::into_raw(Box::new(new_value));
146        let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
147        self.version.fetch_add(1, AtomicOrdering::Release);
148        unsafe { *Box::from_raw(old_ptr) }
149    }
150}
151
152// ============================================================================
153// Stratified SkipList
154// ============================================================================
155
156/// Thread-local random state for height selection
157fn random_height() -> usize {
158    let mut height = 1;
159    // Use a simple PRNG based on pointer address and time
160    let mut state = (&height as *const _ as u64)
161        .wrapping_mul(0x9E3779B97F4A7C15)
162        .wrapping_add(
163            std::time::SystemTime::now()
164                .duration_since(std::time::UNIX_EPOCH)
165                .map(|d| d.as_nanos() as u64)
166                .unwrap_or(0),
167        );
168    
169    while height < MAX_HEIGHT {
170        state = state.wrapping_mul(1103515245).wrapping_add(12345);
171        if (state >> 17) % (P as u64) != 0 {
172            break;
173        }
174        height += 1;
175    }
176    height
177}
178
179/// Stratified skip list with lock-free operations
180pub struct StratifiedSkipList<K, V>
181where
182    K: Ord + Clone,
183    V: Clone,
184{
185    /// Head sentinel node
186    head: *mut SkipNode<K, V>,
187    /// Current maximum height
188    max_height: AtomicUsize,
189    /// Number of elements
190    len: AtomicUsize,
191    /// Capacity before promotion
192    capacity: usize,
193    /// Promotion callback
194    promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
195}
196
197impl<K, V> StratifiedSkipList<K, V>
198where
199    K: Ord + Clone + Default,
200    V: Clone + Default,
201{
202    /// Create a new stratified skip list
203    pub fn new() -> Self {
204        Self::with_capacity(DEFAULT_HOT_CAPACITY)
205    }
206    
207    /// Create with custom capacity
208    pub fn with_capacity(capacity: usize) -> Self {
209        // Create head sentinel with default values
210        let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
211        
212        Self {
213            head,
214            max_height: AtomicUsize::new(1),
215            len: AtomicUsize::new(0),
216            capacity,
217            promoter: None,
218        }
219    }
220    
221    /// Set the promotion callback
222    pub fn set_promoter<F>(&mut self, promoter: F)
223    where
224        F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
225    {
226        self.promoter = Some(Arc::new(promoter));
227    }
228    
229    /// Insert or update a key-value pair (CAS-based)
230    ///
231    /// Returns the old value if the key existed.
232    pub fn insert(&self, key: K, value: V) -> Option<V> {
233        // Check if we need to promote before inserting
234        if self.len() >= self.capacity {
235            self.try_promote();
236        }
237        
238        let height = random_height();
239        let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
240        let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
241        
242        loop {
243            // Find position
244            self.find_position(&key, &mut prev, &mut next);
245            
246            // Check if key exists
247            if !next[0].is_null() {
248                let existing = unsafe { &*next[0] };
249                if existing.key == key {
250                    // Update existing value
251                    let old = unsafe { existing.update_value(value.clone()) };
252                    return Some(old);
253                }
254            }
255            
256            // Create new node
257            let new_node = SkipNode::new(key.clone(), value.clone(), height);
258            
259            // Link at level 0 first
260            unsafe {
261                (*new_node).tower.set(0, next[0]);
262            }
263            
264            // CAS at level 0
265            let prev_ptr = if prev[0].is_null() { self.head } else { prev[0] };
266            match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
267                Ok(_) => {
268                    // Successfully inserted at level 0
269                    // Now link higher levels
270                    for level in 1..height {
271                        loop {
272                            unsafe {
273                                (*new_node).tower.set(level, next[level]);
274                            }
275                            
276                            let prev_at_level = if prev[level].is_null() { self.head } else { prev[level] };
277                            if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }.is_ok() {
278                                break;
279                            }
280                            
281                            // Re-find position at this level
282                            self.find_position(&key, &mut prev, &mut next);
283                        }
284                    }
285                    
286                    // Update max height
287                    loop {
288                        let current_max = self.max_height.load(AtomicOrdering::Relaxed);
289                        if height <= current_max {
290                            break;
291                        }
292                        if self.max_height
293                            .compare_exchange_weak(current_max, height, AtomicOrdering::Release, AtomicOrdering::Relaxed)
294                            .is_ok()
295                        {
296                            break;
297                        }
298                    }
299                    
300                    self.len.fetch_add(1, AtomicOrdering::Release);
301                    return None;
302                }
303                Err(_) => {
304                    // CAS failed, someone else inserted
305                    // Free the node we created and retry
306                    unsafe {
307                        let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
308                        drop(Box::from_raw(value_ptr));
309                        drop(Box::from_raw(new_node));
310                    }
311                    continue;
312                }
313            }
314        }
315    }
316    
317    /// Find the position for a key
318    fn find_position(
319        &self,
320        key: &K,
321        prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
322        next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
323    ) {
324        let max_height = self.max_height.load(AtomicOrdering::Acquire);
325        let mut current = self.head;
326        
327        for level in (0..max_height).rev() {
328            loop {
329                let next_node = unsafe { (*current).tower.get(level) };
330                if next_node.is_null() {
331                    break;
332                }
333                
334                let next_key = unsafe { &(*next_node).key };
335                match next_key.cmp(key) {
336                    Ordering::Less => {
337                        current = next_node;
338                    }
339                    Ordering::Equal | Ordering::Greater => {
340                        break;
341                    }
342                }
343            }
344            
345            prev[level] = if current == self.head { ptr::null_mut() } else { current };
346            next[level] = unsafe { (*current).tower.get(level) };
347        }
348    }
349    
350    /// Get a value by key
351    pub fn get(&self, key: &K) -> Option<V> {
352        let max_height = self.max_height.load(AtomicOrdering::Acquire);
353        let mut current = self.head;
354        
355        for level in (0..max_height).rev() {
356            loop {
357                let next_node = unsafe { (*current).tower.get(level) };
358                if next_node.is_null() {
359                    break;
360                }
361                
362                let next_key = unsafe { &(*next_node).key };
363                match next_key.cmp(key) {
364                    Ordering::Less => {
365                        current = next_node;
366                    }
367                    Ordering::Equal => {
368                        let value = unsafe { (*next_node).get_value().clone() };
369                        return Some(value);
370                    }
371                    Ordering::Greater => {
372                        break;
373                    }
374                }
375            }
376        }
377        
378        None
379    }
380    
381    /// Get the number of elements
382    #[inline]
383    pub fn len(&self) -> usize {
384        self.len.load(AtomicOrdering::Acquire)
385    }
386    
387    /// Check if empty
388    #[inline]
389    pub fn is_empty(&self) -> bool {
390        self.len() == 0
391    }
392    
393    /// Try to promote entries to the underlying storage
394    fn try_promote(&self) {
395        if let Some(ref promoter) = self.promoter {
396            let entries = self.drain();
397            if !entries.is_empty() {
398                promoter(entries);
399            }
400        }
401    }
402    
403    /// Drain all entries for promotion
404    pub fn drain(&self) -> Vec<(K, V)> {
405        let mut entries = Vec::with_capacity(self.len());
406        let mut current = unsafe { (*self.head).tower.get(0) };
407        
408        while !current.is_null() {
409            let node = unsafe { &*current };
410            let key = node.key.clone();
411            let value = unsafe { node.get_value().clone() };
412            entries.push((key, value));
413            current = node.tower.get(0);
414        }
415        
416        // Note: In production, we'd use proper marking for deletion
417        // For now, just reset the count (simplified)
418        self.len.store(0, AtomicOrdering::Release);
419        
420        entries
421    }
422    
423    /// Iterate over all entries (read-only)
424    pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
425        SkipListIter {
426            current: unsafe { (*self.head).tower.get(0) },
427            _marker: std::marker::PhantomData,
428        }
429    }
430}
431
432impl<K, V> Default for StratifiedSkipList<K, V>
433where
434    K: Ord + Clone + Default,
435    V: Clone + Default,
436{
437    fn default() -> Self {
438        Self::new()
439    }
440}
441
442impl<K, V> Drop for StratifiedSkipList<K, V>
443where
444    K: Ord + Clone,
445    V: Clone,
446{
447    fn drop(&mut self) {
448        let mut current = unsafe { (*self.head).tower.get(0) };
449        
450        while !current.is_null() {
451            let next = unsafe { (*current).tower.get(0) };
452            unsafe {
453                let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
454                drop(Box::from_raw(value_ptr));
455                drop(Box::from_raw(current));
456            }
457            current = next;
458        }
459        
460        // Free head
461        unsafe {
462            let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
463            drop(Box::from_raw(value_ptr));
464            drop(Box::from_raw(self.head));
465        }
466    }
467}
468
469// Safety: StratifiedSkipList uses atomic operations for all mutations
470unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send for StratifiedSkipList<K, V> {}
471unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync for StratifiedSkipList<K, V> {}
472
473/// Iterator over skip list entries
474struct SkipListIter<'a, K, V> {
475    current: *mut SkipNode<K, V>,
476    _marker: std::marker::PhantomData<&'a ()>,
477}
478
479impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
480    type Item = (K, V);
481    
482    fn next(&mut self) -> Option<Self::Item> {
483        if self.current.is_null() {
484            return None;
485        }
486        
487        let node = unsafe { &*self.current };
488        let key = node.key.clone();
489        let value = unsafe { node.get_value().clone() };
490        self.current = node.tower.get(0);
491        
492        Some((key, value))
493    }
494}
495
496// ============================================================================
497// Batch Promoter
498// ============================================================================
499
500/// Statistics for promotion
501#[derive(Debug, Clone, Default)]
502pub struct PromotionStats {
503    /// Number of promotions
504    pub promotion_count: u64,
505    /// Total entries promoted
506    pub entries_promoted: u64,
507    /// Average batch size
508    pub avg_batch_size: f64,
509}
510
511/// Batch promoter for deferred LSM promotion
512pub struct BatchPromoter<K, V>
513where
514    K: Ord + Clone + Default,
515    V: Clone + Default,
516{
517    /// Hot buffer (stratified skip list)
518    hot_buffer: StratifiedSkipList<K, V>,
519    /// Pending promotion batches
520    #[allow(dead_code)]
521    pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
522    /// Statistics
523    stats: std::sync::Mutex<PromotionStats>,
524    /// Background promoter thread handle
525    _background: Option<std::thread::JoinHandle<()>>,
526}
527
528impl<K, V> BatchPromoter<K, V>
529where
530    K: Ord + Clone + Default + Send + Sync + 'static,
531    V: Clone + Default + Send + Sync + 'static,
532{
533    /// Create a new batch promoter
534    pub fn new(hot_capacity: usize) -> Arc<Self> {
535        let promoter = Arc::new(Self {
536            hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
537            pending: std::sync::Mutex::new(Vec::new()),
538            stats: std::sync::Mutex::new(PromotionStats::default()),
539            _background: None,
540        });
541        
542        promoter
543    }
544    
545    /// Insert a hot key
546    pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
547        self.hot_buffer.insert(key, value)
548    }
549    
550    /// Get a value (checks hot buffer first)
551    pub fn get(&self, key: &K) -> Option<V> {
552        self.hot_buffer.get(key)
553    }
554    
555    /// Force promotion of all hot entries
556    pub fn force_promote(&self) -> Vec<(K, V)> {
557        let entries = self.hot_buffer.drain();
558        
559        if !entries.is_empty() {
560            let mut stats = self.stats.lock().unwrap();
561            stats.promotion_count += 1;
562            stats.entries_promoted += entries.len() as u64;
563            stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
564        }
565        
566        entries
567    }
568    
569    /// Get statistics
570    pub fn stats(&self) -> PromotionStats {
571        self.stats.lock().unwrap().clone()
572    }
573    
574    /// Get hot buffer size
575    pub fn hot_size(&self) -> usize {
576        self.hot_buffer.len()
577    }
578}
579
580// ============================================================================
581// Deferred Index (combines hot buffer with cold storage)
582// ============================================================================
583
584/// Deferred index with stratified storage
585pub struct DeferredIndex<K, V, Cold>
586where
587    K: Ord + Clone + Default,
588    V: Clone + Default,
589{
590    /// Hot tier (stratified skip list)
591    hot: StratifiedSkipList<K, V>,
592    /// Cold tier (underlying storage)
593    cold: Cold,
594    /// Promotion threshold
595    promotion_threshold: usize,
596    /// Insert count since last promotion
597    insert_count: AtomicUsize,
598}
599
600impl<K, V, Cold> DeferredIndex<K, V, Cold>
601where
602    K: Ord + Clone + Default,
603    V: Clone + Default,
604    Cold: ColdStorage<K, V>,
605{
606    /// Create a new deferred index
607    pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
608        Self {
609            hot: StratifiedSkipList::with_capacity(promotion_threshold),
610            cold,
611            promotion_threshold,
612            insert_count: AtomicUsize::new(0),
613        }
614    }
615    
616    /// Insert a key-value pair
617    pub fn insert(&self, key: K, value: V) -> Option<V> {
618        let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
619        
620        // Check if we need to promote
621        if count >= self.promotion_threshold {
622            self.promote();
623        }
624        
625        self.hot.insert(key, value)
626    }
627    
628    /// Get a value (hot tier first, then cold)
629    pub fn get(&self, key: &K) -> Option<V> {
630        // Check hot tier first
631        if let Some(value) = self.hot.get(key) {
632            return Some(value);
633        }
634        
635        // Fall back to cold tier
636        self.cold.get(key)
637    }
638    
639    /// Promote hot entries to cold storage
640    pub fn promote(&self) {
641        let entries = self.hot.drain();
642        
643        if !entries.is_empty() {
644            self.cold.insert_batch(entries);
645            self.insert_count.store(0, AtomicOrdering::Release);
646        }
647    }
648    
649    /// Get hot tier size
650    pub fn hot_size(&self) -> usize {
651        self.hot.len()
652    }
653}
654
655/// Trait for cold storage backends
656pub trait ColdStorage<K, V>: Send + Sync {
657    /// Get a value
658    fn get(&self, key: &K) -> Option<V>;
659    
660    /// Insert a batch of entries
661    fn insert_batch(&self, entries: Vec<(K, V)>);
662}
663
664// ============================================================================
665// Simple In-Memory Cold Storage (for testing)
666// ============================================================================
667
668/// Simple hashmap-based cold storage
669pub struct HashMapCold<K, V> {
670    data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
671}
672
673impl<K, V> HashMapCold<K, V>
674where
675    K: Eq + std::hash::Hash + Clone,
676{
677    /// Create new cold storage
678    pub fn new() -> Self {
679        Self {
680            data: parking_lot::RwLock::new(std::collections::HashMap::new()),
681        }
682    }
683}
684
685impl<K, V> Default for HashMapCold<K, V>
686where
687    K: Eq + std::hash::Hash + Clone,
688{
689    fn default() -> Self {
690        Self::new()
691    }
692}
693
694impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
695where
696    K: Eq + std::hash::Hash + Clone + Send + Sync,
697    V: Clone + Send + Sync,
698{
699    fn get(&self, key: &K) -> Option<V> {
700        self.data.read().get(key).cloned()
701    }
702    
703    fn insert_batch(&self, entries: Vec<(K, V)>) {
704        let mut data = self.data.write();
705        for (k, v) in entries {
706            data.insert(k, v);
707        }
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use std::sync::Arc;
715    use std::thread;
716    
717    #[test]
718    fn test_skiplist_basic() {
719        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
720        
721        assert!(list.insert(1, "one".to_string()).is_none());
722        assert!(list.insert(2, "two".to_string()).is_none());
723        assert!(list.insert(3, "three".to_string()).is_none());
724        
725        assert_eq!(list.len(), 3);
726        assert_eq!(list.get(&1), Some("one".to_string()));
727        assert_eq!(list.get(&2), Some("two".to_string()));
728        assert_eq!(list.get(&3), Some("three".to_string()));
729        assert_eq!(list.get(&4), None);
730    }
731    
732    #[test]
733    fn test_skiplist_update() {
734        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
735        
736        assert!(list.insert(1, "one".to_string()).is_none());
737        assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
738        
739        assert_eq!(list.len(), 1);
740        assert_eq!(list.get(&1), Some("ONE".to_string()));
741    }
742    
743    #[test]
744    fn test_skiplist_concurrent() {
745        let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
746        let mut handles = vec![];
747        
748        for t in 0..4 {
749            let list_clone = list.clone();
750            handles.push(thread::spawn(move || {
751                for i in 0..1000 {
752                    let key = t * 1000 + i;
753                    list_clone.insert(key, key * 2);
754                }
755            }));
756        }
757        
758        for handle in handles {
759            handle.join().unwrap();
760        }
761        
762        assert_eq!(list.len(), 4000);
763        
764        // Verify some values
765        assert_eq!(list.get(&0), Some(0));
766        assert_eq!(list.get(&1000), Some(2000));
767        assert_eq!(list.get(&2000), Some(4000));
768    }
769    
770    #[test]
771    fn test_skiplist_drain() {
772        let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
773        
774        for i in 0..100 {
775            list.insert(i, i * 2);
776        }
777        
778        let entries = list.drain();
779        assert_eq!(entries.len(), 100);
780        
781        // Should be sorted
782        for (i, (k, v)) in entries.iter().enumerate() {
783            assert_eq!(*k, i as i32);
784            assert_eq!(*v, (i * 2) as i32);
785        }
786    }
787    
788    #[test]
789    fn test_batch_promoter() {
790        let promoter = BatchPromoter::<i32, i32>::new(100);
791        
792        for i in 0..50 {
793            promoter.insert_hot(i, i * 2);
794        }
795        
796        assert_eq!(promoter.hot_size(), 50);
797        assert_eq!(promoter.get(&10), Some(20));
798        
799        let promoted = promoter.force_promote();
800        assert_eq!(promoted.len(), 50);
801        
802        let stats = promoter.stats();
803        assert_eq!(stats.promotion_count, 1);
804        assert_eq!(stats.entries_promoted, 50);
805    }
806    
807    #[test]
808    fn test_deferred_index() {
809        let cold = HashMapCold::<i32, i32>::new();
810        let index = DeferredIndex::new(cold, 10);
811        
812        // Insert hot keys
813        for i in 0..5 {
814            index.insert(i, i * 10);
815        }
816        
817        // Get from hot tier
818        assert_eq!(index.get(&3), Some(30));
819        
820        // Insert more to trigger promotion
821        for i in 5..15 {
822            index.insert(i, i * 10);
823        }
824        
825        // After promotion, should still find values
826        assert_eq!(index.get(&0), Some(0)); // Was promoted
827        assert_eq!(index.get(&12), Some(120)); // Was promoted or still hot
828    }
829    
830    #[test]
831    fn test_hot_key_absorption() {
832        let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
833        
834        // Simulate hot key pattern
835        for _ in 0..100 {
836            list.insert("hot_key".to_string(), 42);
837        }
838        
839        // Should only have one entry (absorbed)
840        assert_eq!(list.len(), 1);
841        assert_eq!(list.get(&"hot_key".to_string()), Some(42));
842    }
843}