Skip to main content

sochdb_storage/
stratified_skiplist.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Stratified SkipList with Deferred LSM Promotion (Task 2)
19//!
20//! This module provides a probabilistic hot-key buffer with CAS-based insertion
21//! and batched promotion to the underlying LSM tree.
22//!
23//! ## Problem
24//!
25//! Every insert traverses the full memtable skiplist, even for frequently updated
26//! keys that will be overwritten soon. Hot keys cause repeated work.
27//!
28//! ## Solution
29//!
30//! Two-level structure:
31//! 1. **Hot Buffer:** Small, CAS-based skiplist for recent writes
32//! 2. **Cold Storage:** Full memtable (promoted in batches)
33//!
34//! Hot keys are absorbed by the buffer; only final values get promoted.
35//!
36//! ## Performance
37//!
38//! | Metric | Before | After |
39//! |--------|--------|-------|
40//! | Hot key update | O(log N) | O(log H) where H << N |
41//! | Promotion overhead | Per-op | Batched (amortized) |
42//! | Lock contention | High | CAS-based (lock-free) |
43
44use std::cmp::Ordering;
45use std::ptr;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
48
49/// Maximum skiplist height
50const MAX_HEIGHT: usize = 16;
51
52/// Probability for height selection (1/4)
53const P: u32 = 4;
54
55/// Default hot buffer capacity before promotion
56const DEFAULT_HOT_CAPACITY: usize = 4096;
57
58// ============================================================================
59// Skip Node (Lock-Free Tower)
60// ============================================================================
61
62/// Tower of forward pointers for a skip node
63#[repr(C)]
64struct Tower<K, V> {
65    /// Height of this tower (1..=MAX_HEIGHT)
66    height: usize,
67    /// Forward pointers at each level
68    next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
69}
70
71impl<K, V> Tower<K, V> {
72    /// Create a new tower with the given height
73    fn new(height: usize) -> Self {
74        let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
75            std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
76
77        // Initialize all pointers to null
78        for ptr in next.iter_mut().take(height) {
79            *ptr = AtomicPtr::new(ptr::null_mut());
80        }
81
82        Self { height, next }
83    }
84
85    /// Get the forward pointer at a level
86    #[inline]
87    fn get(&self, level: usize) -> *mut SkipNode<K, V> {
88        self.next[level].load(AtomicOrdering::Acquire)
89    }
90
91    /// Set the forward pointer at a level
92    #[inline]
93    fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
94        self.next[level].store(node, AtomicOrdering::Release);
95    }
96
97    /// CAS the forward pointer at a level
98    #[inline]
99    fn cas(
100        &self,
101        level: usize,
102        expected: *mut SkipNode<K, V>,
103        new: *mut SkipNode<K, V>,
104    ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
105        self.next[level].compare_exchange(
106            expected,
107            new,
108            AtomicOrdering::AcqRel,
109            AtomicOrdering::Acquire,
110        )
111    }
112}
113
114/// A node in the skip list
115#[repr(C)]
116struct SkipNode<K, V> {
117    /// The key
118    key: K,
119    /// The value (can be updated atomically)
120    value: AtomicPtr<V>,
121    /// Version counter for optimistic concurrency
122    version: AtomicU64,
123    /// Tower of forward pointers
124    tower: Tower<K, V>,
125}
126
127impl<K, V> SkipNode<K, V> {
128    /// Create a new skip node
129    fn new(key: K, value: V, height: usize) -> *mut Self {
130        let value_ptr = Box::into_raw(Box::new(value));
131        let node = Box::new(Self {
132            key,
133            value: AtomicPtr::new(value_ptr),
134            version: AtomicU64::new(1),
135            tower: Tower::new(height),
136        });
137        Box::into_raw(node)
138    }
139
140    /// Get the current value
141    #[inline]
142    unsafe fn get_value(&self) -> &V {
143        unsafe { &*self.value.load(AtomicOrdering::Acquire) }
144    }
145
146    /// Update the value (CAS-based)
147    #[inline]
148    unsafe fn update_value(&self, new_value: V) -> V {
149        let new_ptr = Box::into_raw(Box::new(new_value));
150        let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
151        self.version.fetch_add(1, AtomicOrdering::Release);
152        unsafe { *Box::from_raw(old_ptr) }
153    }
154}
155
156// ============================================================================
157// Stratified SkipList
158// ============================================================================
159
160/// Thread-local random state for height selection
161fn random_height() -> usize {
162    let mut height = 1;
163    // Use a simple PRNG based on pointer address and time
164    let mut state = (&height as *const _ as u64)
165        .wrapping_mul(0x9E3779B97F4A7C15)
166        .wrapping_add(
167            std::time::SystemTime::now()
168                .duration_since(std::time::UNIX_EPOCH)
169                .map(|d| d.as_nanos() as u64)
170                .unwrap_or(0),
171        );
172
173    while height < MAX_HEIGHT {
174        state = state.wrapping_mul(1103515245).wrapping_add(12345);
175        if (state >> 17) % (P as u64) != 0 {
176            break;
177        }
178        height += 1;
179    }
180    height
181}
182
183/// Stratified skip list with lock-free operations
184pub struct StratifiedSkipList<K, V>
185where
186    K: Ord + Clone,
187    V: Clone,
188{
189    /// Head sentinel node
190    head: *mut SkipNode<K, V>,
191    /// Current maximum height
192    max_height: AtomicUsize,
193    /// Number of elements
194    len: AtomicUsize,
195    /// Capacity before promotion
196    capacity: usize,
197    /// Promotion callback
198    promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
199}
200
201impl<K, V> StratifiedSkipList<K, V>
202where
203    K: Ord + Clone + Default,
204    V: Clone + Default,
205{
206    /// Create a new stratified skip list
207    pub fn new() -> Self {
208        Self::with_capacity(DEFAULT_HOT_CAPACITY)
209    }
210
211    /// Create with custom capacity
212    pub fn with_capacity(capacity: usize) -> Self {
213        // Create head sentinel with default values
214        let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
215
216        Self {
217            head,
218            max_height: AtomicUsize::new(1),
219            len: AtomicUsize::new(0),
220            capacity,
221            promoter: None,
222        }
223    }
224
225    /// Set the promotion callback
226    pub fn set_promoter<F>(&mut self, promoter: F)
227    where
228        F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
229    {
230        self.promoter = Some(Arc::new(promoter));
231    }
232
233    /// Insert or update a key-value pair (CAS-based)
234    ///
235    /// Returns the old value if the key existed.
236    pub fn insert(&self, key: K, value: V) -> Option<V> {
237        // Check if we need to promote before inserting
238        if self.len() >= self.capacity {
239            self.try_promote();
240        }
241
242        let height = random_height();
243        let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
244        let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
245
246        loop {
247            // Find position
248            self.find_position(&key, &mut prev, &mut next);
249
250            // Check if key exists
251            if !next[0].is_null() {
252                let existing = unsafe { &*next[0] };
253                if existing.key == key {
254                    // Update existing value
255                    let old = unsafe { existing.update_value(value.clone()) };
256                    return Some(old);
257                }
258            }
259
260            // Create new node
261            let new_node = SkipNode::new(key.clone(), value.clone(), height);
262
263            // Link at level 0 first
264            unsafe {
265                (*new_node).tower.set(0, next[0]);
266            }
267
268            // CAS at level 0
269            let prev_ptr = if prev[0].is_null() {
270                self.head
271            } else {
272                prev[0]
273            };
274            match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
275                Ok(_) => {
276                    // Successfully inserted at level 0
277                    // Now link higher levels
278                    for level in 1..height {
279                        loop {
280                            unsafe {
281                                (*new_node).tower.set(level, next[level]);
282                            }
283
284                            let prev_at_level = if prev[level].is_null() {
285                                self.head
286                            } else {
287                                prev[level]
288                            };
289                            if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }
290                                .is_ok()
291                            {
292                                break;
293                            }
294
295                            // Re-find position at this level
296                            self.find_position(&key, &mut prev, &mut next);
297                        }
298                    }
299
300                    // Update max height
301                    loop {
302                        let current_max = self.max_height.load(AtomicOrdering::Relaxed);
303                        if height <= current_max {
304                            break;
305                        }
306                        if self
307                            .max_height
308                            .compare_exchange_weak(
309                                current_max,
310                                height,
311                                AtomicOrdering::Release,
312                                AtomicOrdering::Relaxed,
313                            )
314                            .is_ok()
315                        {
316                            break;
317                        }
318                    }
319
320                    self.len.fetch_add(1, AtomicOrdering::Release);
321                    return None;
322                }
323                Err(_) => {
324                    // CAS failed, someone else inserted
325                    // Free the node we created and retry
326                    unsafe {
327                        let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
328                        drop(Box::from_raw(value_ptr));
329                        drop(Box::from_raw(new_node));
330                    }
331                    continue;
332                }
333            }
334        }
335    }
336
337    /// Find the position for a key
338    fn find_position(
339        &self,
340        key: &K,
341        prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
342        next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
343    ) {
344        let max_height = self.max_height.load(AtomicOrdering::Acquire);
345        let mut current = self.head;
346
347        for level in (0..max_height).rev() {
348            // `succ` MUST be the successor the loop actually compared against
349            // (first node with key >= `key`, or null) — NOT a fresh re-read of
350            // current.next afterwards. A concurrent insert can splice a node
351            // with key < `key` between `current` and the old successor in the
352            // window between the loop deciding to stop and a re-read; capturing
353            // that as next[level] makes `insert` link the new node BEFORE the
354            // smaller-key node, mis-ordering the level-0 chain and orphaning it
355            // (counted in len, unreachable to ordered get). Keeping next[]
356            // consistent with the comparison means the level-0 CAS expecting
357            // `succ` fails and retries on exactly that race.
358            let mut succ = unsafe { (*current).tower.get(level) };
359            while !succ.is_null() {
360                let next_key = unsafe { &(*succ).key };
361                match next_key.cmp(key) {
362                    Ordering::Less => {
363                        current = succ;
364                        succ = unsafe { (*current).tower.get(level) };
365                    }
366                    Ordering::Equal | Ordering::Greater => break,
367                }
368            }
369
370            prev[level] = if current == self.head {
371                ptr::null_mut()
372            } else {
373                current
374            };
375            next[level] = succ;
376        }
377    }
378
379    /// Get a value by key
380    pub fn get(&self, key: &K) -> Option<V> {
381        let max_height = self.max_height.load(AtomicOrdering::Acquire);
382        let mut current = self.head;
383
384        for level in (0..max_height).rev() {
385            loop {
386                let next_node = unsafe { (*current).tower.get(level) };
387                if next_node.is_null() {
388                    break;
389                }
390
391                let next_key = unsafe { &(*next_node).key };
392                match next_key.cmp(key) {
393                    Ordering::Less => {
394                        current = next_node;
395                    }
396                    Ordering::Equal => {
397                        let value = unsafe { (*next_node).get_value().clone() };
398                        return Some(value);
399                    }
400                    Ordering::Greater => {
401                        break;
402                    }
403                }
404            }
405        }
406
407        None
408    }
409
410    /// Get the number of elements
411    #[inline]
412    pub fn len(&self) -> usize {
413        self.len.load(AtomicOrdering::Acquire)
414    }
415
416    /// Check if empty
417    #[inline]
418    pub fn is_empty(&self) -> bool {
419        self.len() == 0
420    }
421
422    /// Try to promote entries to the underlying storage
423    fn try_promote(&self) {
424        if let Some(ref promoter) = self.promoter {
425            let entries = self.drain();
426            if !entries.is_empty() {
427                promoter(entries);
428            }
429        }
430    }
431
432    /// Drain all entries for promotion
433    pub fn drain(&self) -> Vec<(K, V)> {
434        let mut entries = Vec::with_capacity(self.len());
435        let mut current = unsafe { (*self.head).tower.get(0) };
436
437        while !current.is_null() {
438            let node = unsafe { &*current };
439            let key = node.key.clone();
440            let value = unsafe { node.get_value().clone() };
441            entries.push((key, value));
442            current = node.tower.get(0);
443        }
444
445        // Note: In production, we'd use proper marking for deletion
446        // For now, just reset the count (simplified)
447        self.len.store(0, AtomicOrdering::Release);
448
449        entries
450    }
451
452    /// Iterate over all entries (read-only)
453    pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
454        SkipListIter {
455            current: unsafe { (*self.head).tower.get(0) },
456            _marker: std::marker::PhantomData,
457        }
458    }
459}
460
461impl<K, V> Default for StratifiedSkipList<K, V>
462where
463    K: Ord + Clone + Default,
464    V: Clone + Default,
465{
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471impl<K, V> Drop for StratifiedSkipList<K, V>
472where
473    K: Ord + Clone,
474    V: Clone,
475{
476    fn drop(&mut self) {
477        let mut current = unsafe { (*self.head).tower.get(0) };
478
479        while !current.is_null() {
480            let next = unsafe { (*current).tower.get(0) };
481            unsafe {
482                let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
483                drop(Box::from_raw(value_ptr));
484                drop(Box::from_raw(current));
485            }
486            current = next;
487        }
488
489        // Free head
490        unsafe {
491            let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
492            drop(Box::from_raw(value_ptr));
493            drop(Box::from_raw(self.head));
494        }
495    }
496}
497
498// Safety: StratifiedSkipList uses atomic operations for all mutations
499unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send
500    for StratifiedSkipList<K, V>
501{
502}
503unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync
504    for StratifiedSkipList<K, V>
505{
506}
507
508/// Iterator over skip list entries
509struct SkipListIter<'a, K, V> {
510    current: *mut SkipNode<K, V>,
511    _marker: std::marker::PhantomData<&'a ()>,
512}
513
514impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
515    type Item = (K, V);
516
517    fn next(&mut self) -> Option<Self::Item> {
518        if self.current.is_null() {
519            return None;
520        }
521
522        let node = unsafe { &*self.current };
523        let key = node.key.clone();
524        let value = unsafe { node.get_value().clone() };
525        self.current = node.tower.get(0);
526
527        Some((key, value))
528    }
529}
530
531// ============================================================================
532// Batch Promoter
533// ============================================================================
534
535/// Statistics for promotion
536#[derive(Debug, Clone, Default)]
537pub struct PromotionStats {
538    /// Number of promotions
539    pub promotion_count: u64,
540    /// Total entries promoted
541    pub entries_promoted: u64,
542    /// Average batch size
543    pub avg_batch_size: f64,
544}
545
546/// Batch promoter for deferred LSM promotion
547pub struct BatchPromoter<K, V>
548where
549    K: Ord + Clone + Default,
550    V: Clone + Default,
551{
552    /// Hot buffer (stratified skip list)
553    hot_buffer: StratifiedSkipList<K, V>,
554    /// Pending promotion batches
555    #[allow(dead_code)]
556    pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
557    /// Statistics
558    stats: std::sync::Mutex<PromotionStats>,
559    /// Background promoter thread handle
560    _background: Option<std::thread::JoinHandle<()>>,
561}
562
563impl<K, V> BatchPromoter<K, V>
564where
565    K: Ord + Clone + Default + Send + Sync + 'static,
566    V: Clone + Default + Send + Sync + 'static,
567{
568    /// Create a new batch promoter
569    pub fn new(hot_capacity: usize) -> Arc<Self> {
570        let promoter = Arc::new(Self {
571            hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
572            pending: std::sync::Mutex::new(Vec::new()),
573            stats: std::sync::Mutex::new(PromotionStats::default()),
574            _background: None,
575        });
576
577        promoter
578    }
579
580    /// Insert a hot key
581    pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
582        self.hot_buffer.insert(key, value)
583    }
584
585    /// Get a value (checks hot buffer first)
586    pub fn get(&self, key: &K) -> Option<V> {
587        self.hot_buffer.get(key)
588    }
589
590    /// Force promotion of all hot entries
591    pub fn force_promote(&self) -> Vec<(K, V)> {
592        let entries = self.hot_buffer.drain();
593
594        if !entries.is_empty() {
595            let mut stats = self.stats.lock().unwrap();
596            stats.promotion_count += 1;
597            stats.entries_promoted += entries.len() as u64;
598            stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
599        }
600
601        entries
602    }
603
604    /// Get statistics
605    pub fn stats(&self) -> PromotionStats {
606        self.stats.lock().unwrap().clone()
607    }
608
609    /// Get hot buffer size
610    pub fn hot_size(&self) -> usize {
611        self.hot_buffer.len()
612    }
613}
614
615// ============================================================================
616// Deferred Index (combines hot buffer with cold storage)
617// ============================================================================
618
619/// Deferred index with stratified storage
620pub struct DeferredIndex<K, V, Cold>
621where
622    K: Ord + Clone + Default,
623    V: Clone + Default,
624{
625    /// Hot tier (stratified skip list)
626    hot: StratifiedSkipList<K, V>,
627    /// Cold tier (underlying storage)
628    cold: Cold,
629    /// Promotion threshold
630    promotion_threshold: usize,
631    /// Insert count since last promotion
632    insert_count: AtomicUsize,
633}
634
635impl<K, V, Cold> DeferredIndex<K, V, Cold>
636where
637    K: Ord + Clone + Default,
638    V: Clone + Default,
639    Cold: ColdStorage<K, V>,
640{
641    /// Create a new deferred index
642    pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
643        Self {
644            hot: StratifiedSkipList::with_capacity(promotion_threshold),
645            cold,
646            promotion_threshold,
647            insert_count: AtomicUsize::new(0),
648        }
649    }
650
651    /// Insert a key-value pair
652    pub fn insert(&self, key: K, value: V) -> Option<V> {
653        let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
654
655        // Check if we need to promote
656        if count >= self.promotion_threshold {
657            self.promote();
658        }
659
660        self.hot.insert(key, value)
661    }
662
663    /// Get a value (hot tier first, then cold)
664    pub fn get(&self, key: &K) -> Option<V> {
665        // Check hot tier first
666        if let Some(value) = self.hot.get(key) {
667            return Some(value);
668        }
669
670        // Fall back to cold tier
671        self.cold.get(key)
672    }
673
674    /// Promote hot entries to cold storage
675    pub fn promote(&self) {
676        let entries = self.hot.drain();
677
678        if !entries.is_empty() {
679            self.cold.insert_batch(entries);
680            self.insert_count.store(0, AtomicOrdering::Release);
681        }
682    }
683
684    /// Get hot tier size
685    pub fn hot_size(&self) -> usize {
686        self.hot.len()
687    }
688}
689
690/// Trait for cold storage backends
691pub trait ColdStorage<K, V>: Send + Sync {
692    /// Get a value
693    fn get(&self, key: &K) -> Option<V>;
694
695    /// Insert a batch of entries
696    fn insert_batch(&self, entries: Vec<(K, V)>);
697}
698
699// ============================================================================
700// Simple In-Memory Cold Storage (for testing)
701// ============================================================================
702
703/// Simple hashmap-based cold storage
704pub struct HashMapCold<K, V> {
705    data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
706}
707
708impl<K, V> HashMapCold<K, V>
709where
710    K: Eq + std::hash::Hash + Clone,
711{
712    /// Create new cold storage
713    pub fn new() -> Self {
714        Self {
715            data: parking_lot::RwLock::new(std::collections::HashMap::new()),
716        }
717    }
718}
719
720impl<K, V> Default for HashMapCold<K, V>
721where
722    K: Eq + std::hash::Hash + Clone,
723{
724    fn default() -> Self {
725        Self::new()
726    }
727}
728
729impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
730where
731    K: Eq + std::hash::Hash + Clone + Send + Sync,
732    V: Clone + Send + Sync,
733{
734    fn get(&self, key: &K) -> Option<V> {
735        self.data.read().get(key).cloned()
736    }
737
738    fn insert_batch(&self, entries: Vec<(K, V)>) {
739        let mut data = self.data.write();
740        for (k, v) in entries {
741            data.insert(k, v);
742        }
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use std::sync::Arc;
750    use std::thread;
751
752    #[test]
753    fn test_skiplist_basic() {
754        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
755
756        assert!(list.insert(1, "one".to_string()).is_none());
757        assert!(list.insert(2, "two".to_string()).is_none());
758        assert!(list.insert(3, "three".to_string()).is_none());
759
760        assert_eq!(list.len(), 3);
761        assert_eq!(list.get(&1), Some("one".to_string()));
762        assert_eq!(list.get(&2), Some("two".to_string()));
763        assert_eq!(list.get(&3), Some("three".to_string()));
764        assert_eq!(list.get(&4), None);
765    }
766
767    #[test]
768    fn test_skiplist_update() {
769        let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
770
771        assert!(list.insert(1, "one".to_string()).is_none());
772        assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
773
774        assert_eq!(list.len(), 1);
775        assert_eq!(list.get(&1), Some("ONE".to_string()));
776    }
777
778    #[test]
779    fn test_skiplist_concurrent() {
780        let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
781        let mut handles = vec![];
782
783        for t in 0..4 {
784            let list_clone = list.clone();
785            handles.push(thread::spawn(move || {
786                for i in 0..1000 {
787                    let key = t * 1000 + i;
788                    list_clone.insert(key, key * 2);
789                }
790            }));
791        }
792
793        for handle in handles {
794            handle.join().unwrap();
795        }
796
797        assert_eq!(list.len(), 4000);
798
799        // Verify some values
800        assert_eq!(list.get(&0), Some(0));
801        assert_eq!(list.get(&1000), Some(2000));
802        assert_eq!(list.get(&2000), Some(4000));
803    }
804
805    #[test]
806    fn test_skiplist_drain() {
807        let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
808
809        for i in 0..100 {
810            list.insert(i, i * 2);
811        }
812
813        let entries = list.drain();
814        assert_eq!(entries.len(), 100);
815
816        // Should be sorted
817        for (i, (k, v)) in entries.iter().enumerate() {
818            assert_eq!(*k, i as i32);
819            assert_eq!(*v, (i * 2) as i32);
820        }
821    }
822
823    #[test]
824    fn test_batch_promoter() {
825        let promoter = BatchPromoter::<i32, i32>::new(100);
826
827        for i in 0..50 {
828            promoter.insert_hot(i, i * 2);
829        }
830
831        assert_eq!(promoter.hot_size(), 50);
832        assert_eq!(promoter.get(&10), Some(20));
833
834        let promoted = promoter.force_promote();
835        assert_eq!(promoted.len(), 50);
836
837        let stats = promoter.stats();
838        assert_eq!(stats.promotion_count, 1);
839        assert_eq!(stats.entries_promoted, 50);
840    }
841
842    #[test]
843    fn test_deferred_index() {
844        let cold = HashMapCold::<i32, i32>::new();
845        let index = DeferredIndex::new(cold, 10);
846
847        // Insert hot keys
848        for i in 0..5 {
849            index.insert(i, i * 10);
850        }
851
852        // Get from hot tier
853        assert_eq!(index.get(&3), Some(30));
854
855        // Insert more to trigger promotion
856        for i in 5..15 {
857            index.insert(i, i * 10);
858        }
859
860        // After promotion, should still find values
861        assert_eq!(index.get(&0), Some(0)); // Was promoted
862        assert_eq!(index.get(&12), Some(120)); // Was promoted or still hot
863    }
864
865    #[test]
866    fn test_hot_key_absorption() {
867        let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
868
869        // Simulate hot key pattern
870        for _ in 0..100 {
871            list.insert("hot_key".to_string(), 42);
872        }
873
874        // Should only have one entry (absorbed)
875        assert_eq!(list.len(), 1);
876        assert_eq!(list.get(&"hot_key".to_string()), Some(42));
877    }
878}