1use std::cmp::Ordering;
42use std::ptr;
43use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
44use std::sync::Arc;
45
46const MAX_HEIGHT: usize = 16;
48
49const P: u32 = 4;
51
52const DEFAULT_HOT_CAPACITY: usize = 4096;
54
55#[repr(C)]
61struct Tower<K, V> {
62 height: usize,
64 next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
66}
67
68impl<K, V> Tower<K, V> {
69 fn new(height: usize) -> Self {
71 let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
72 std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
73
74 for ptr in next.iter_mut().take(height) {
76 *ptr = AtomicPtr::new(ptr::null_mut());
77 }
78
79 Self { height, next }
80 }
81
82 #[inline]
84 fn get(&self, level: usize) -> *mut SkipNode<K, V> {
85 self.next[level].load(AtomicOrdering::Acquire)
86 }
87
88 #[inline]
90 fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
91 self.next[level].store(node, AtomicOrdering::Release);
92 }
93
94 #[inline]
96 fn cas(
97 &self,
98 level: usize,
99 expected: *mut SkipNode<K, V>,
100 new: *mut SkipNode<K, V>,
101 ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
102 self.next[level]
103 .compare_exchange(expected, new, AtomicOrdering::AcqRel, AtomicOrdering::Acquire)
104 }
105}
106
107#[repr(C)]
109struct SkipNode<K, V> {
110 key: K,
112 value: AtomicPtr<V>,
114 version: AtomicU64,
116 tower: Tower<K, V>,
118}
119
120impl<K, V> SkipNode<K, V> {
121 fn new(key: K, value: V, height: usize) -> *mut Self {
123 let value_ptr = Box::into_raw(Box::new(value));
124 let node = Box::new(Self {
125 key,
126 value: AtomicPtr::new(value_ptr),
127 version: AtomicU64::new(1),
128 tower: Tower::new(height),
129 });
130 Box::into_raw(node)
131 }
132
133 #[inline]
135 unsafe fn get_value(&self) -> &V {
136 unsafe { &*self.value.load(AtomicOrdering::Acquire) }
137 }
138
139 #[inline]
141 unsafe fn update_value(&self, new_value: V) -> V {
142 let new_ptr = Box::into_raw(Box::new(new_value));
143 let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
144 self.version.fetch_add(1, AtomicOrdering::Release);
145 unsafe { *Box::from_raw(old_ptr) }
146 }
147}
148
149fn random_height() -> usize {
155 let mut height = 1;
156 let mut state = (&height as *const _ as u64)
158 .wrapping_mul(0x9E3779B97F4A7C15)
159 .wrapping_add(
160 std::time::SystemTime::now()
161 .duration_since(std::time::UNIX_EPOCH)
162 .map(|d| d.as_nanos() as u64)
163 .unwrap_or(0),
164 );
165
166 while height < MAX_HEIGHT {
167 state = state.wrapping_mul(1103515245).wrapping_add(12345);
168 if (state >> 17) % (P as u64) != 0 {
169 break;
170 }
171 height += 1;
172 }
173 height
174}
175
176pub struct StratifiedSkipList<K, V>
178where
179 K: Ord + Clone,
180 V: Clone,
181{
182 head: *mut SkipNode<K, V>,
184 max_height: AtomicUsize,
186 len: AtomicUsize,
188 capacity: usize,
190 promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
192}
193
194impl<K, V> StratifiedSkipList<K, V>
195where
196 K: Ord + Clone + Default,
197 V: Clone + Default,
198{
199 pub fn new() -> Self {
201 Self::with_capacity(DEFAULT_HOT_CAPACITY)
202 }
203
204 pub fn with_capacity(capacity: usize) -> Self {
206 let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
208
209 Self {
210 head,
211 max_height: AtomicUsize::new(1),
212 len: AtomicUsize::new(0),
213 capacity,
214 promoter: None,
215 }
216 }
217
218 pub fn set_promoter<F>(&mut self, promoter: F)
220 where
221 F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
222 {
223 self.promoter = Some(Arc::new(promoter));
224 }
225
226 pub fn insert(&self, key: K, value: V) -> Option<V> {
230 if self.len() >= self.capacity {
232 self.try_promote();
233 }
234
235 let height = random_height();
236 let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
237 let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
238
239 loop {
240 self.find_position(&key, &mut prev, &mut next);
242
243 if !next[0].is_null() {
245 let existing = unsafe { &*next[0] };
246 if existing.key == key {
247 let old = unsafe { existing.update_value(value.clone()) };
249 return Some(old);
250 }
251 }
252
253 let new_node = SkipNode::new(key.clone(), value.clone(), height);
255
256 unsafe {
258 (*new_node).tower.set(0, next[0]);
259 }
260
261 let prev_ptr = if prev[0].is_null() { self.head } else { prev[0] };
263 match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
264 Ok(_) => {
265 for level in 1..height {
268 loop {
269 unsafe {
270 (*new_node).tower.set(level, next[level]);
271 }
272
273 let prev_at_level = if prev[level].is_null() { self.head } else { prev[level] };
274 if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }.is_ok() {
275 break;
276 }
277
278 self.find_position(&key, &mut prev, &mut next);
280 }
281 }
282
283 loop {
285 let current_max = self.max_height.load(AtomicOrdering::Relaxed);
286 if height <= current_max {
287 break;
288 }
289 if self.max_height
290 .compare_exchange_weak(current_max, height, AtomicOrdering::Release, AtomicOrdering::Relaxed)
291 .is_ok()
292 {
293 break;
294 }
295 }
296
297 self.len.fetch_add(1, AtomicOrdering::Release);
298 return None;
299 }
300 Err(_) => {
301 unsafe {
304 let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
305 drop(Box::from_raw(value_ptr));
306 drop(Box::from_raw(new_node));
307 }
308 continue;
309 }
310 }
311 }
312 }
313
314 fn find_position(
316 &self,
317 key: &K,
318 prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
319 next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
320 ) {
321 let max_height = self.max_height.load(AtomicOrdering::Acquire);
322 let mut current = self.head;
323
324 for level in (0..max_height).rev() {
325 loop {
326 let next_node = unsafe { (*current).tower.get(level) };
327 if next_node.is_null() {
328 break;
329 }
330
331 let next_key = unsafe { &(*next_node).key };
332 match next_key.cmp(key) {
333 Ordering::Less => {
334 current = next_node;
335 }
336 Ordering::Equal | Ordering::Greater => {
337 break;
338 }
339 }
340 }
341
342 prev[level] = if current == self.head { ptr::null_mut() } else { current };
343 next[level] = unsafe { (*current).tower.get(level) };
344 }
345 }
346
347 pub fn get(&self, key: &K) -> Option<V> {
349 let max_height = self.max_height.load(AtomicOrdering::Acquire);
350 let mut current = self.head;
351
352 for level in (0..max_height).rev() {
353 loop {
354 let next_node = unsafe { (*current).tower.get(level) };
355 if next_node.is_null() {
356 break;
357 }
358
359 let next_key = unsafe { &(*next_node).key };
360 match next_key.cmp(key) {
361 Ordering::Less => {
362 current = next_node;
363 }
364 Ordering::Equal => {
365 let value = unsafe { (*next_node).get_value().clone() };
366 return Some(value);
367 }
368 Ordering::Greater => {
369 break;
370 }
371 }
372 }
373 }
374
375 None
376 }
377
378 #[inline]
380 pub fn len(&self) -> usize {
381 self.len.load(AtomicOrdering::Acquire)
382 }
383
384 #[inline]
386 pub fn is_empty(&self) -> bool {
387 self.len() == 0
388 }
389
390 fn try_promote(&self) {
392 if let Some(ref promoter) = self.promoter {
393 let entries = self.drain();
394 if !entries.is_empty() {
395 promoter(entries);
396 }
397 }
398 }
399
400 pub fn drain(&self) -> Vec<(K, V)> {
402 let mut entries = Vec::with_capacity(self.len());
403 let mut current = unsafe { (*self.head).tower.get(0) };
404
405 while !current.is_null() {
406 let node = unsafe { &*current };
407 let key = node.key.clone();
408 let value = unsafe { node.get_value().clone() };
409 entries.push((key, value));
410 current = node.tower.get(0);
411 }
412
413 self.len.store(0, AtomicOrdering::Release);
416
417 entries
418 }
419
420 pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
422 SkipListIter {
423 current: unsafe { (*self.head).tower.get(0) },
424 _marker: std::marker::PhantomData,
425 }
426 }
427}
428
429impl<K, V> Default for StratifiedSkipList<K, V>
430where
431 K: Ord + Clone + Default,
432 V: Clone + Default,
433{
434 fn default() -> Self {
435 Self::new()
436 }
437}
438
439impl<K, V> Drop for StratifiedSkipList<K, V>
440where
441 K: Ord + Clone,
442 V: Clone,
443{
444 fn drop(&mut self) {
445 let mut current = unsafe { (*self.head).tower.get(0) };
446
447 while !current.is_null() {
448 let next = unsafe { (*current).tower.get(0) };
449 unsafe {
450 let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
451 drop(Box::from_raw(value_ptr));
452 drop(Box::from_raw(current));
453 }
454 current = next;
455 }
456
457 unsafe {
459 let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
460 drop(Box::from_raw(value_ptr));
461 drop(Box::from_raw(self.head));
462 }
463 }
464}
465
466unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send for StratifiedSkipList<K, V> {}
468unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync for StratifiedSkipList<K, V> {}
469
470struct SkipListIter<'a, K, V> {
472 current: *mut SkipNode<K, V>,
473 _marker: std::marker::PhantomData<&'a ()>,
474}
475
476impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
477 type Item = (K, V);
478
479 fn next(&mut self) -> Option<Self::Item> {
480 if self.current.is_null() {
481 return None;
482 }
483
484 let node = unsafe { &*self.current };
485 let key = node.key.clone();
486 let value = unsafe { node.get_value().clone() };
487 self.current = node.tower.get(0);
488
489 Some((key, value))
490 }
491}
492
493#[derive(Debug, Clone, Default)]
499pub struct PromotionStats {
500 pub promotion_count: u64,
502 pub entries_promoted: u64,
504 pub avg_batch_size: f64,
506}
507
508pub struct BatchPromoter<K, V>
510where
511 K: Ord + Clone + Default,
512 V: Clone + Default,
513{
514 hot_buffer: StratifiedSkipList<K, V>,
516 #[allow(dead_code)]
518 pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
519 stats: std::sync::Mutex<PromotionStats>,
521 _background: Option<std::thread::JoinHandle<()>>,
523}
524
525impl<K, V> BatchPromoter<K, V>
526where
527 K: Ord + Clone + Default + Send + Sync + 'static,
528 V: Clone + Default + Send + Sync + 'static,
529{
530 pub fn new(hot_capacity: usize) -> Arc<Self> {
532 let promoter = Arc::new(Self {
533 hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
534 pending: std::sync::Mutex::new(Vec::new()),
535 stats: std::sync::Mutex::new(PromotionStats::default()),
536 _background: None,
537 });
538
539 promoter
540 }
541
542 pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
544 self.hot_buffer.insert(key, value)
545 }
546
547 pub fn get(&self, key: &K) -> Option<V> {
549 self.hot_buffer.get(key)
550 }
551
552 pub fn force_promote(&self) -> Vec<(K, V)> {
554 let entries = self.hot_buffer.drain();
555
556 if !entries.is_empty() {
557 let mut stats = self.stats.lock().unwrap();
558 stats.promotion_count += 1;
559 stats.entries_promoted += entries.len() as u64;
560 stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
561 }
562
563 entries
564 }
565
566 pub fn stats(&self) -> PromotionStats {
568 self.stats.lock().unwrap().clone()
569 }
570
571 pub fn hot_size(&self) -> usize {
573 self.hot_buffer.len()
574 }
575}
576
577pub struct DeferredIndex<K, V, Cold>
583where
584 K: Ord + Clone + Default,
585 V: Clone + Default,
586{
587 hot: StratifiedSkipList<K, V>,
589 cold: Cold,
591 promotion_threshold: usize,
593 insert_count: AtomicUsize,
595}
596
597impl<K, V, Cold> DeferredIndex<K, V, Cold>
598where
599 K: Ord + Clone + Default,
600 V: Clone + Default,
601 Cold: ColdStorage<K, V>,
602{
603 pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
605 Self {
606 hot: StratifiedSkipList::with_capacity(promotion_threshold),
607 cold,
608 promotion_threshold,
609 insert_count: AtomicUsize::new(0),
610 }
611 }
612
613 pub fn insert(&self, key: K, value: V) -> Option<V> {
615 let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
616
617 if count >= self.promotion_threshold {
619 self.promote();
620 }
621
622 self.hot.insert(key, value)
623 }
624
625 pub fn get(&self, key: &K) -> Option<V> {
627 if let Some(value) = self.hot.get(key) {
629 return Some(value);
630 }
631
632 self.cold.get(key)
634 }
635
636 pub fn promote(&self) {
638 let entries = self.hot.drain();
639
640 if !entries.is_empty() {
641 self.cold.insert_batch(entries);
642 self.insert_count.store(0, AtomicOrdering::Release);
643 }
644 }
645
646 pub fn hot_size(&self) -> usize {
648 self.hot.len()
649 }
650}
651
652pub trait ColdStorage<K, V>: Send + Sync {
654 fn get(&self, key: &K) -> Option<V>;
656
657 fn insert_batch(&self, entries: Vec<(K, V)>);
659}
660
661pub struct HashMapCold<K, V> {
667 data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
668}
669
670impl<K, V> HashMapCold<K, V>
671where
672 K: Eq + std::hash::Hash + Clone,
673{
674 pub fn new() -> Self {
676 Self {
677 data: parking_lot::RwLock::new(std::collections::HashMap::new()),
678 }
679 }
680}
681
682impl<K, V> Default for HashMapCold<K, V>
683where
684 K: Eq + std::hash::Hash + Clone,
685{
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
692where
693 K: Eq + std::hash::Hash + Clone + Send + Sync,
694 V: Clone + Send + Sync,
695{
696 fn get(&self, key: &K) -> Option<V> {
697 self.data.read().get(key).cloned()
698 }
699
700 fn insert_batch(&self, entries: Vec<(K, V)>) {
701 let mut data = self.data.write();
702 for (k, v) in entries {
703 data.insert(k, v);
704 }
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711 use std::sync::Arc;
712 use std::thread;
713
714 #[test]
715 fn test_skiplist_basic() {
716 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
717
718 assert!(list.insert(1, "one".to_string()).is_none());
719 assert!(list.insert(2, "two".to_string()).is_none());
720 assert!(list.insert(3, "three".to_string()).is_none());
721
722 assert_eq!(list.len(), 3);
723 assert_eq!(list.get(&1), Some("one".to_string()));
724 assert_eq!(list.get(&2), Some("two".to_string()));
725 assert_eq!(list.get(&3), Some("three".to_string()));
726 assert_eq!(list.get(&4), None);
727 }
728
729 #[test]
730 fn test_skiplist_update() {
731 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
732
733 assert!(list.insert(1, "one".to_string()).is_none());
734 assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
735
736 assert_eq!(list.len(), 1);
737 assert_eq!(list.get(&1), Some("ONE".to_string()));
738 }
739
740 #[test]
741 fn test_skiplist_concurrent() {
742 let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
743 let mut handles = vec![];
744
745 for t in 0..4 {
746 let list_clone = list.clone();
747 handles.push(thread::spawn(move || {
748 for i in 0..1000 {
749 let key = t * 1000 + i;
750 list_clone.insert(key, key * 2);
751 }
752 }));
753 }
754
755 for handle in handles {
756 handle.join().unwrap();
757 }
758
759 assert_eq!(list.len(), 4000);
760
761 assert_eq!(list.get(&0), Some(0));
763 assert_eq!(list.get(&1000), Some(2000));
764 assert_eq!(list.get(&2000), Some(4000));
765 }
766
767 #[test]
768 fn test_skiplist_drain() {
769 let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
770
771 for i in 0..100 {
772 list.insert(i, i * 2);
773 }
774
775 let entries = list.drain();
776 assert_eq!(entries.len(), 100);
777
778 for (i, (k, v)) in entries.iter().enumerate() {
780 assert_eq!(*k, i as i32);
781 assert_eq!(*v, (i * 2) as i32);
782 }
783 }
784
785 #[test]
786 fn test_batch_promoter() {
787 let promoter = BatchPromoter::<i32, i32>::new(100);
788
789 for i in 0..50 {
790 promoter.insert_hot(i, i * 2);
791 }
792
793 assert_eq!(promoter.hot_size(), 50);
794 assert_eq!(promoter.get(&10), Some(20));
795
796 let promoted = promoter.force_promote();
797 assert_eq!(promoted.len(), 50);
798
799 let stats = promoter.stats();
800 assert_eq!(stats.promotion_count, 1);
801 assert_eq!(stats.entries_promoted, 50);
802 }
803
804 #[test]
805 fn test_deferred_index() {
806 let cold = HashMapCold::<i32, i32>::new();
807 let index = DeferredIndex::new(cold, 10);
808
809 for i in 0..5 {
811 index.insert(i, i * 10);
812 }
813
814 assert_eq!(index.get(&3), Some(30));
816
817 for i in 5..15 {
819 index.insert(i, i * 10);
820 }
821
822 assert_eq!(index.get(&0), Some(0)); assert_eq!(index.get(&12), Some(120)); }
826
827 #[test]
828 fn test_hot_key_absorption() {
829 let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
830
831 for _ in 0..100 {
833 list.insert("hot_key".to_string(), 42);
834 }
835
836 assert_eq!(list.len(), 1);
838 assert_eq!(list.get(&"hot_key".to_string()), Some(42));
839 }
840}