1use std::cmp::Ordering;
45use std::ptr;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
48
49const MAX_HEIGHT: usize = 16;
51
52const P: u32 = 4;
54
55const DEFAULT_HOT_CAPACITY: usize = 4096;
57
58#[repr(C)]
64struct Tower<K, V> {
65 height: usize,
67 next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
69}
70
71impl<K, V> Tower<K, V> {
72 fn new(height: usize) -> Self {
74 let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
75 std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
76
77 for ptr in next.iter_mut().take(height) {
79 *ptr = AtomicPtr::new(ptr::null_mut());
80 }
81
82 Self { height, next }
83 }
84
85 #[inline]
87 fn get(&self, level: usize) -> *mut SkipNode<K, V> {
88 self.next[level].load(AtomicOrdering::Acquire)
89 }
90
91 #[inline]
93 fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
94 self.next[level].store(node, AtomicOrdering::Release);
95 }
96
97 #[inline]
99 fn cas(
100 &self,
101 level: usize,
102 expected: *mut SkipNode<K, V>,
103 new: *mut SkipNode<K, V>,
104 ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
105 self.next[level].compare_exchange(
106 expected,
107 new,
108 AtomicOrdering::AcqRel,
109 AtomicOrdering::Acquire,
110 )
111 }
112}
113
114#[repr(C)]
116struct SkipNode<K, V> {
117 key: K,
119 value: AtomicPtr<V>,
121 version: AtomicU64,
123 tower: Tower<K, V>,
125}
126
127impl<K, V> SkipNode<K, V> {
128 fn new(key: K, value: V, height: usize) -> *mut Self {
130 let value_ptr = Box::into_raw(Box::new(value));
131 let node = Box::new(Self {
132 key,
133 value: AtomicPtr::new(value_ptr),
134 version: AtomicU64::new(1),
135 tower: Tower::new(height),
136 });
137 Box::into_raw(node)
138 }
139
140 #[inline]
142 unsafe fn get_value(&self) -> &V {
143 unsafe { &*self.value.load(AtomicOrdering::Acquire) }
144 }
145
146 #[inline]
148 unsafe fn update_value(&self, new_value: V) -> V {
149 let new_ptr = Box::into_raw(Box::new(new_value));
150 let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
151 self.version.fetch_add(1, AtomicOrdering::Release);
152 unsafe { *Box::from_raw(old_ptr) }
153 }
154}
155
156fn random_height() -> usize {
162 let mut height = 1;
163 let mut state = (&height as *const _ as u64)
165 .wrapping_mul(0x9E3779B97F4A7C15)
166 .wrapping_add(
167 std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .map(|d| d.as_nanos() as u64)
170 .unwrap_or(0),
171 );
172
173 while height < MAX_HEIGHT {
174 state = state.wrapping_mul(1103515245).wrapping_add(12345);
175 if (state >> 17) % (P as u64) != 0 {
176 break;
177 }
178 height += 1;
179 }
180 height
181}
182
183pub struct StratifiedSkipList<K, V>
185where
186 K: Ord + Clone,
187 V: Clone,
188{
189 head: *mut SkipNode<K, V>,
191 max_height: AtomicUsize,
193 len: AtomicUsize,
195 capacity: usize,
197 promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
199}
200
201impl<K, V> StratifiedSkipList<K, V>
202where
203 K: Ord + Clone + Default,
204 V: Clone + Default,
205{
206 pub fn new() -> Self {
208 Self::with_capacity(DEFAULT_HOT_CAPACITY)
209 }
210
211 pub fn with_capacity(capacity: usize) -> Self {
213 let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
215
216 Self {
217 head,
218 max_height: AtomicUsize::new(1),
219 len: AtomicUsize::new(0),
220 capacity,
221 promoter: None,
222 }
223 }
224
225 pub fn set_promoter<F>(&mut self, promoter: F)
227 where
228 F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
229 {
230 self.promoter = Some(Arc::new(promoter));
231 }
232
233 pub fn insert(&self, key: K, value: V) -> Option<V> {
237 if self.len() >= self.capacity {
239 self.try_promote();
240 }
241
242 let height = random_height();
243 let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
244 let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
245
246 loop {
247 self.find_position(&key, &mut prev, &mut next);
249
250 if !next[0].is_null() {
252 let existing = unsafe { &*next[0] };
253 if existing.key == key {
254 let old = unsafe { existing.update_value(value.clone()) };
256 return Some(old);
257 }
258 }
259
260 let new_node = SkipNode::new(key.clone(), value.clone(), height);
262
263 unsafe {
265 (*new_node).tower.set(0, next[0]);
266 }
267
268 let prev_ptr = if prev[0].is_null() {
270 self.head
271 } else {
272 prev[0]
273 };
274 match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
275 Ok(_) => {
276 for level in 1..height {
279 loop {
280 unsafe {
281 (*new_node).tower.set(level, next[level]);
282 }
283
284 let prev_at_level = if prev[level].is_null() {
285 self.head
286 } else {
287 prev[level]
288 };
289 if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }
290 .is_ok()
291 {
292 break;
293 }
294
295 self.find_position(&key, &mut prev, &mut next);
297 }
298 }
299
300 loop {
302 let current_max = self.max_height.load(AtomicOrdering::Relaxed);
303 if height <= current_max {
304 break;
305 }
306 if self
307 .max_height
308 .compare_exchange_weak(
309 current_max,
310 height,
311 AtomicOrdering::Release,
312 AtomicOrdering::Relaxed,
313 )
314 .is_ok()
315 {
316 break;
317 }
318 }
319
320 self.len.fetch_add(1, AtomicOrdering::Release);
321 return None;
322 }
323 Err(_) => {
324 unsafe {
327 let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
328 drop(Box::from_raw(value_ptr));
329 drop(Box::from_raw(new_node));
330 }
331 continue;
332 }
333 }
334 }
335 }
336
337 fn find_position(
339 &self,
340 key: &K,
341 prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
342 next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
343 ) {
344 let max_height = self.max_height.load(AtomicOrdering::Acquire);
345 let mut current = self.head;
346
347 for level in (0..max_height).rev() {
348 let mut succ = unsafe { (*current).tower.get(level) };
359 while !succ.is_null() {
360 let next_key = unsafe { &(*succ).key };
361 match next_key.cmp(key) {
362 Ordering::Less => {
363 current = succ;
364 succ = unsafe { (*current).tower.get(level) };
365 }
366 Ordering::Equal | Ordering::Greater => break,
367 }
368 }
369
370 prev[level] = if current == self.head {
371 ptr::null_mut()
372 } else {
373 current
374 };
375 next[level] = succ;
376 }
377 }
378
379 pub fn get(&self, key: &K) -> Option<V> {
381 let max_height = self.max_height.load(AtomicOrdering::Acquire);
382 let mut current = self.head;
383
384 for level in (0..max_height).rev() {
385 loop {
386 let next_node = unsafe { (*current).tower.get(level) };
387 if next_node.is_null() {
388 break;
389 }
390
391 let next_key = unsafe { &(*next_node).key };
392 match next_key.cmp(key) {
393 Ordering::Less => {
394 current = next_node;
395 }
396 Ordering::Equal => {
397 let value = unsafe { (*next_node).get_value().clone() };
398 return Some(value);
399 }
400 Ordering::Greater => {
401 break;
402 }
403 }
404 }
405 }
406
407 None
408 }
409
410 #[inline]
412 pub fn len(&self) -> usize {
413 self.len.load(AtomicOrdering::Acquire)
414 }
415
416 #[inline]
418 pub fn is_empty(&self) -> bool {
419 self.len() == 0
420 }
421
422 fn try_promote(&self) {
424 if let Some(ref promoter) = self.promoter {
425 let entries = self.drain();
426 if !entries.is_empty() {
427 promoter(entries);
428 }
429 }
430 }
431
432 pub fn drain(&self) -> Vec<(K, V)> {
434 let mut entries = Vec::with_capacity(self.len());
435 let mut current = unsafe { (*self.head).tower.get(0) };
436
437 while !current.is_null() {
438 let node = unsafe { &*current };
439 let key = node.key.clone();
440 let value = unsafe { node.get_value().clone() };
441 entries.push((key, value));
442 current = node.tower.get(0);
443 }
444
445 self.len.store(0, AtomicOrdering::Release);
448
449 entries
450 }
451
452 pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
454 SkipListIter {
455 current: unsafe { (*self.head).tower.get(0) },
456 _marker: std::marker::PhantomData,
457 }
458 }
459}
460
461impl<K, V> Default for StratifiedSkipList<K, V>
462where
463 K: Ord + Clone + Default,
464 V: Clone + Default,
465{
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471impl<K, V> Drop for StratifiedSkipList<K, V>
472where
473 K: Ord + Clone,
474 V: Clone,
475{
476 fn drop(&mut self) {
477 let mut current = unsafe { (*self.head).tower.get(0) };
478
479 while !current.is_null() {
480 let next = unsafe { (*current).tower.get(0) };
481 unsafe {
482 let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
483 drop(Box::from_raw(value_ptr));
484 drop(Box::from_raw(current));
485 }
486 current = next;
487 }
488
489 unsafe {
491 let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
492 drop(Box::from_raw(value_ptr));
493 drop(Box::from_raw(self.head));
494 }
495 }
496}
497
498unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send
500 for StratifiedSkipList<K, V>
501{
502}
503unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync
504 for StratifiedSkipList<K, V>
505{
506}
507
508struct SkipListIter<'a, K, V> {
510 current: *mut SkipNode<K, V>,
511 _marker: std::marker::PhantomData<&'a ()>,
512}
513
514impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
515 type Item = (K, V);
516
517 fn next(&mut self) -> Option<Self::Item> {
518 if self.current.is_null() {
519 return None;
520 }
521
522 let node = unsafe { &*self.current };
523 let key = node.key.clone();
524 let value = unsafe { node.get_value().clone() };
525 self.current = node.tower.get(0);
526
527 Some((key, value))
528 }
529}
530
531#[derive(Debug, Clone, Default)]
537pub struct PromotionStats {
538 pub promotion_count: u64,
540 pub entries_promoted: u64,
542 pub avg_batch_size: f64,
544}
545
546pub struct BatchPromoter<K, V>
548where
549 K: Ord + Clone + Default,
550 V: Clone + Default,
551{
552 hot_buffer: StratifiedSkipList<K, V>,
554 #[allow(dead_code)]
556 pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
557 stats: std::sync::Mutex<PromotionStats>,
559 _background: Option<std::thread::JoinHandle<()>>,
561}
562
563impl<K, V> BatchPromoter<K, V>
564where
565 K: Ord + Clone + Default + Send + Sync + 'static,
566 V: Clone + Default + Send + Sync + 'static,
567{
568 pub fn new(hot_capacity: usize) -> Arc<Self> {
570 let promoter = Arc::new(Self {
571 hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
572 pending: std::sync::Mutex::new(Vec::new()),
573 stats: std::sync::Mutex::new(PromotionStats::default()),
574 _background: None,
575 });
576
577 promoter
578 }
579
580 pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
582 self.hot_buffer.insert(key, value)
583 }
584
585 pub fn get(&self, key: &K) -> Option<V> {
587 self.hot_buffer.get(key)
588 }
589
590 pub fn force_promote(&self) -> Vec<(K, V)> {
592 let entries = self.hot_buffer.drain();
593
594 if !entries.is_empty() {
595 let mut stats = self.stats.lock().unwrap();
596 stats.promotion_count += 1;
597 stats.entries_promoted += entries.len() as u64;
598 stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
599 }
600
601 entries
602 }
603
604 pub fn stats(&self) -> PromotionStats {
606 self.stats.lock().unwrap().clone()
607 }
608
609 pub fn hot_size(&self) -> usize {
611 self.hot_buffer.len()
612 }
613}
614
615pub struct DeferredIndex<K, V, Cold>
621where
622 K: Ord + Clone + Default,
623 V: Clone + Default,
624{
625 hot: StratifiedSkipList<K, V>,
627 cold: Cold,
629 promotion_threshold: usize,
631 insert_count: AtomicUsize,
633}
634
635impl<K, V, Cold> DeferredIndex<K, V, Cold>
636where
637 K: Ord + Clone + Default,
638 V: Clone + Default,
639 Cold: ColdStorage<K, V>,
640{
641 pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
643 Self {
644 hot: StratifiedSkipList::with_capacity(promotion_threshold),
645 cold,
646 promotion_threshold,
647 insert_count: AtomicUsize::new(0),
648 }
649 }
650
651 pub fn insert(&self, key: K, value: V) -> Option<V> {
653 let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
654
655 if count >= self.promotion_threshold {
657 self.promote();
658 }
659
660 self.hot.insert(key, value)
661 }
662
663 pub fn get(&self, key: &K) -> Option<V> {
665 if let Some(value) = self.hot.get(key) {
667 return Some(value);
668 }
669
670 self.cold.get(key)
672 }
673
674 pub fn promote(&self) {
676 let entries = self.hot.drain();
677
678 if !entries.is_empty() {
679 self.cold.insert_batch(entries);
680 self.insert_count.store(0, AtomicOrdering::Release);
681 }
682 }
683
684 pub fn hot_size(&self) -> usize {
686 self.hot.len()
687 }
688}
689
690pub trait ColdStorage<K, V>: Send + Sync {
692 fn get(&self, key: &K) -> Option<V>;
694
695 fn insert_batch(&self, entries: Vec<(K, V)>);
697}
698
699pub struct HashMapCold<K, V> {
705 data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
706}
707
708impl<K, V> HashMapCold<K, V>
709where
710 K: Eq + std::hash::Hash + Clone,
711{
712 pub fn new() -> Self {
714 Self {
715 data: parking_lot::RwLock::new(std::collections::HashMap::new()),
716 }
717 }
718}
719
720impl<K, V> Default for HashMapCold<K, V>
721where
722 K: Eq + std::hash::Hash + Clone,
723{
724 fn default() -> Self {
725 Self::new()
726 }
727}
728
729impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
730where
731 K: Eq + std::hash::Hash + Clone + Send + Sync,
732 V: Clone + Send + Sync,
733{
734 fn get(&self, key: &K) -> Option<V> {
735 self.data.read().get(key).cloned()
736 }
737
738 fn insert_batch(&self, entries: Vec<(K, V)>) {
739 let mut data = self.data.write();
740 for (k, v) in entries {
741 data.insert(k, v);
742 }
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use std::sync::Arc;
750 use std::thread;
751
752 #[test]
753 fn test_skiplist_basic() {
754 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
755
756 assert!(list.insert(1, "one".to_string()).is_none());
757 assert!(list.insert(2, "two".to_string()).is_none());
758 assert!(list.insert(3, "three".to_string()).is_none());
759
760 assert_eq!(list.len(), 3);
761 assert_eq!(list.get(&1), Some("one".to_string()));
762 assert_eq!(list.get(&2), Some("two".to_string()));
763 assert_eq!(list.get(&3), Some("three".to_string()));
764 assert_eq!(list.get(&4), None);
765 }
766
767 #[test]
768 fn test_skiplist_update() {
769 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
770
771 assert!(list.insert(1, "one".to_string()).is_none());
772 assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
773
774 assert_eq!(list.len(), 1);
775 assert_eq!(list.get(&1), Some("ONE".to_string()));
776 }
777
778 #[test]
779 fn test_skiplist_concurrent() {
780 let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
781 let mut handles = vec![];
782
783 for t in 0..4 {
784 let list_clone = list.clone();
785 handles.push(thread::spawn(move || {
786 for i in 0..1000 {
787 let key = t * 1000 + i;
788 list_clone.insert(key, key * 2);
789 }
790 }));
791 }
792
793 for handle in handles {
794 handle.join().unwrap();
795 }
796
797 assert_eq!(list.len(), 4000);
798
799 assert_eq!(list.get(&0), Some(0));
801 assert_eq!(list.get(&1000), Some(2000));
802 assert_eq!(list.get(&2000), Some(4000));
803 }
804
805 #[test]
806 fn test_skiplist_drain() {
807 let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
808
809 for i in 0..100 {
810 list.insert(i, i * 2);
811 }
812
813 let entries = list.drain();
814 assert_eq!(entries.len(), 100);
815
816 for (i, (k, v)) in entries.iter().enumerate() {
818 assert_eq!(*k, i as i32);
819 assert_eq!(*v, (i * 2) as i32);
820 }
821 }
822
823 #[test]
824 fn test_batch_promoter() {
825 let promoter = BatchPromoter::<i32, i32>::new(100);
826
827 for i in 0..50 {
828 promoter.insert_hot(i, i * 2);
829 }
830
831 assert_eq!(promoter.hot_size(), 50);
832 assert_eq!(promoter.get(&10), Some(20));
833
834 let promoted = promoter.force_promote();
835 assert_eq!(promoted.len(), 50);
836
837 let stats = promoter.stats();
838 assert_eq!(stats.promotion_count, 1);
839 assert_eq!(stats.entries_promoted, 50);
840 }
841
842 #[test]
843 fn test_deferred_index() {
844 let cold = HashMapCold::<i32, i32>::new();
845 let index = DeferredIndex::new(cold, 10);
846
847 for i in 0..5 {
849 index.insert(i, i * 10);
850 }
851
852 assert_eq!(index.get(&3), Some(30));
854
855 for i in 5..15 {
857 index.insert(i, i * 10);
858 }
859
860 assert_eq!(index.get(&0), Some(0)); assert_eq!(index.get(&12), Some(120)); }
864
865 #[test]
866 fn test_hot_key_absorption() {
867 let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
868
869 for _ in 0..100 {
871 list.insert("hot_key".to_string(), 42);
872 }
873
874 assert_eq!(list.len(), 1);
876 assert_eq!(list.get(&"hot_key".to_string()), Some(42));
877 }
878}