1use std::cmp::Ordering;
45use std::ptr;
46use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
47use std::sync::Arc;
48
49const MAX_HEIGHT: usize = 16;
51
52const P: u32 = 4;
54
55const DEFAULT_HOT_CAPACITY: usize = 4096;
57
58#[repr(C)]
64struct Tower<K, V> {
65 height: usize,
67 next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
69}
70
71impl<K, V> Tower<K, V> {
72 fn new(height: usize) -> Self {
74 let mut next: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT] =
75 std::array::from_fn(|_| AtomicPtr::new(ptr::null_mut()));
76
77 for ptr in next.iter_mut().take(height) {
79 *ptr = AtomicPtr::new(ptr::null_mut());
80 }
81
82 Self { height, next }
83 }
84
85 #[inline]
87 fn get(&self, level: usize) -> *mut SkipNode<K, V> {
88 self.next[level].load(AtomicOrdering::Acquire)
89 }
90
91 #[inline]
93 fn set(&self, level: usize, node: *mut SkipNode<K, V>) {
94 self.next[level].store(node, AtomicOrdering::Release);
95 }
96
97 #[inline]
99 fn cas(
100 &self,
101 level: usize,
102 expected: *mut SkipNode<K, V>,
103 new: *mut SkipNode<K, V>,
104 ) -> Result<*mut SkipNode<K, V>, *mut SkipNode<K, V>> {
105 self.next[level]
106 .compare_exchange(expected, new, AtomicOrdering::AcqRel, AtomicOrdering::Acquire)
107 }
108}
109
110#[repr(C)]
112struct SkipNode<K, V> {
113 key: K,
115 value: AtomicPtr<V>,
117 version: AtomicU64,
119 tower: Tower<K, V>,
121}
122
123impl<K, V> SkipNode<K, V> {
124 fn new(key: K, value: V, height: usize) -> *mut Self {
126 let value_ptr = Box::into_raw(Box::new(value));
127 let node = Box::new(Self {
128 key,
129 value: AtomicPtr::new(value_ptr),
130 version: AtomicU64::new(1),
131 tower: Tower::new(height),
132 });
133 Box::into_raw(node)
134 }
135
136 #[inline]
138 unsafe fn get_value(&self) -> &V {
139 unsafe { &*self.value.load(AtomicOrdering::Acquire) }
140 }
141
142 #[inline]
144 unsafe fn update_value(&self, new_value: V) -> V {
145 let new_ptr = Box::into_raw(Box::new(new_value));
146 let old_ptr = self.value.swap(new_ptr, AtomicOrdering::AcqRel);
147 self.version.fetch_add(1, AtomicOrdering::Release);
148 unsafe { *Box::from_raw(old_ptr) }
149 }
150}
151
152fn random_height() -> usize {
158 let mut height = 1;
159 let mut state = (&height as *const _ as u64)
161 .wrapping_mul(0x9E3779B97F4A7C15)
162 .wrapping_add(
163 std::time::SystemTime::now()
164 .duration_since(std::time::UNIX_EPOCH)
165 .map(|d| d.as_nanos() as u64)
166 .unwrap_or(0),
167 );
168
169 while height < MAX_HEIGHT {
170 state = state.wrapping_mul(1103515245).wrapping_add(12345);
171 if (state >> 17) % (P as u64) != 0 {
172 break;
173 }
174 height += 1;
175 }
176 height
177}
178
179pub struct StratifiedSkipList<K, V>
181where
182 K: Ord + Clone,
183 V: Clone,
184{
185 head: *mut SkipNode<K, V>,
187 max_height: AtomicUsize,
189 len: AtomicUsize,
191 capacity: usize,
193 promoter: Option<Arc<dyn Fn(Vec<(K, V)>) + Send + Sync>>,
195}
196
197impl<K, V> StratifiedSkipList<K, V>
198where
199 K: Ord + Clone + Default,
200 V: Clone + Default,
201{
202 pub fn new() -> Self {
204 Self::with_capacity(DEFAULT_HOT_CAPACITY)
205 }
206
207 pub fn with_capacity(capacity: usize) -> Self {
209 let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
211
212 Self {
213 head,
214 max_height: AtomicUsize::new(1),
215 len: AtomicUsize::new(0),
216 capacity,
217 promoter: None,
218 }
219 }
220
221 pub fn set_promoter<F>(&mut self, promoter: F)
223 where
224 F: Fn(Vec<(K, V)>) + Send + Sync + 'static,
225 {
226 self.promoter = Some(Arc::new(promoter));
227 }
228
229 pub fn insert(&self, key: K, value: V) -> Option<V> {
233 if self.len() >= self.capacity {
235 self.try_promote();
236 }
237
238 let height = random_height();
239 let mut prev = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
240 let mut next = [ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
241
242 loop {
243 self.find_position(&key, &mut prev, &mut next);
245
246 if !next[0].is_null() {
248 let existing = unsafe { &*next[0] };
249 if existing.key == key {
250 let old = unsafe { existing.update_value(value.clone()) };
252 return Some(old);
253 }
254 }
255
256 let new_node = SkipNode::new(key.clone(), value.clone(), height);
258
259 unsafe {
261 (*new_node).tower.set(0, next[0]);
262 }
263
264 let prev_ptr = if prev[0].is_null() { self.head } else { prev[0] };
266 match unsafe { (*prev_ptr).tower.cas(0, next[0], new_node) } {
267 Ok(_) => {
268 for level in 1..height {
271 loop {
272 unsafe {
273 (*new_node).tower.set(level, next[level]);
274 }
275
276 let prev_at_level = if prev[level].is_null() { self.head } else { prev[level] };
277 if unsafe { (*prev_at_level).tower.cas(level, next[level], new_node) }.is_ok() {
278 break;
279 }
280
281 self.find_position(&key, &mut prev, &mut next);
283 }
284 }
285
286 loop {
288 let current_max = self.max_height.load(AtomicOrdering::Relaxed);
289 if height <= current_max {
290 break;
291 }
292 if self.max_height
293 .compare_exchange_weak(current_max, height, AtomicOrdering::Release, AtomicOrdering::Relaxed)
294 .is_ok()
295 {
296 break;
297 }
298 }
299
300 self.len.fetch_add(1, AtomicOrdering::Release);
301 return None;
302 }
303 Err(_) => {
304 unsafe {
307 let value_ptr = (*new_node).value.load(AtomicOrdering::Relaxed);
308 drop(Box::from_raw(value_ptr));
309 drop(Box::from_raw(new_node));
310 }
311 continue;
312 }
313 }
314 }
315 }
316
317 fn find_position(
319 &self,
320 key: &K,
321 prev: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
322 next: &mut [*mut SkipNode<K, V>; MAX_HEIGHT],
323 ) {
324 let max_height = self.max_height.load(AtomicOrdering::Acquire);
325 let mut current = self.head;
326
327 for level in (0..max_height).rev() {
328 loop {
329 let next_node = unsafe { (*current).tower.get(level) };
330 if next_node.is_null() {
331 break;
332 }
333
334 let next_key = unsafe { &(*next_node).key };
335 match next_key.cmp(key) {
336 Ordering::Less => {
337 current = next_node;
338 }
339 Ordering::Equal | Ordering::Greater => {
340 break;
341 }
342 }
343 }
344
345 prev[level] = if current == self.head { ptr::null_mut() } else { current };
346 next[level] = unsafe { (*current).tower.get(level) };
347 }
348 }
349
350 pub fn get(&self, key: &K) -> Option<V> {
352 let max_height = self.max_height.load(AtomicOrdering::Acquire);
353 let mut current = self.head;
354
355 for level in (0..max_height).rev() {
356 loop {
357 let next_node = unsafe { (*current).tower.get(level) };
358 if next_node.is_null() {
359 break;
360 }
361
362 let next_key = unsafe { &(*next_node).key };
363 match next_key.cmp(key) {
364 Ordering::Less => {
365 current = next_node;
366 }
367 Ordering::Equal => {
368 let value = unsafe { (*next_node).get_value().clone() };
369 return Some(value);
370 }
371 Ordering::Greater => {
372 break;
373 }
374 }
375 }
376 }
377
378 None
379 }
380
381 #[inline]
383 pub fn len(&self) -> usize {
384 self.len.load(AtomicOrdering::Acquire)
385 }
386
387 #[inline]
389 pub fn is_empty(&self) -> bool {
390 self.len() == 0
391 }
392
393 fn try_promote(&self) {
395 if let Some(ref promoter) = self.promoter {
396 let entries = self.drain();
397 if !entries.is_empty() {
398 promoter(entries);
399 }
400 }
401 }
402
403 pub fn drain(&self) -> Vec<(K, V)> {
405 let mut entries = Vec::with_capacity(self.len());
406 let mut current = unsafe { (*self.head).tower.get(0) };
407
408 while !current.is_null() {
409 let node = unsafe { &*current };
410 let key = node.key.clone();
411 let value = unsafe { node.get_value().clone() };
412 entries.push((key, value));
413 current = node.tower.get(0);
414 }
415
416 self.len.store(0, AtomicOrdering::Release);
419
420 entries
421 }
422
423 pub fn iter(&self) -> impl Iterator<Item = (K, V)> + '_ {
425 SkipListIter {
426 current: unsafe { (*self.head).tower.get(0) },
427 _marker: std::marker::PhantomData,
428 }
429 }
430}
431
432impl<K, V> Default for StratifiedSkipList<K, V>
433where
434 K: Ord + Clone + Default,
435 V: Clone + Default,
436{
437 fn default() -> Self {
438 Self::new()
439 }
440}
441
442impl<K, V> Drop for StratifiedSkipList<K, V>
443where
444 K: Ord + Clone,
445 V: Clone,
446{
447 fn drop(&mut self) {
448 let mut current = unsafe { (*self.head).tower.get(0) };
449
450 while !current.is_null() {
451 let next = unsafe { (*current).tower.get(0) };
452 unsafe {
453 let value_ptr = (*current).value.load(AtomicOrdering::Relaxed);
454 drop(Box::from_raw(value_ptr));
455 drop(Box::from_raw(current));
456 }
457 current = next;
458 }
459
460 unsafe {
462 let value_ptr = (*self.head).value.load(AtomicOrdering::Relaxed);
463 drop(Box::from_raw(value_ptr));
464 drop(Box::from_raw(self.head));
465 }
466 }
467}
468
469unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Send for StratifiedSkipList<K, V> {}
471unsafe impl<K: Send + Sync + Ord + Clone, V: Send + Sync + Clone> Sync for StratifiedSkipList<K, V> {}
472
473struct SkipListIter<'a, K, V> {
475 current: *mut SkipNode<K, V>,
476 _marker: std::marker::PhantomData<&'a ()>,
477}
478
479impl<'a, K: Clone, V: Clone> Iterator for SkipListIter<'a, K, V> {
480 type Item = (K, V);
481
482 fn next(&mut self) -> Option<Self::Item> {
483 if self.current.is_null() {
484 return None;
485 }
486
487 let node = unsafe { &*self.current };
488 let key = node.key.clone();
489 let value = unsafe { node.get_value().clone() };
490 self.current = node.tower.get(0);
491
492 Some((key, value))
493 }
494}
495
496#[derive(Debug, Clone, Default)]
502pub struct PromotionStats {
503 pub promotion_count: u64,
505 pub entries_promoted: u64,
507 pub avg_batch_size: f64,
509}
510
511pub struct BatchPromoter<K, V>
513where
514 K: Ord + Clone + Default,
515 V: Clone + Default,
516{
517 hot_buffer: StratifiedSkipList<K, V>,
519 #[allow(dead_code)]
521 pending: std::sync::Mutex<Vec<Vec<(K, V)>>>,
522 stats: std::sync::Mutex<PromotionStats>,
524 _background: Option<std::thread::JoinHandle<()>>,
526}
527
528impl<K, V> BatchPromoter<K, V>
529where
530 K: Ord + Clone + Default + Send + Sync + 'static,
531 V: Clone + Default + Send + Sync + 'static,
532{
533 pub fn new(hot_capacity: usize) -> Arc<Self> {
535 let promoter = Arc::new(Self {
536 hot_buffer: StratifiedSkipList::with_capacity(hot_capacity),
537 pending: std::sync::Mutex::new(Vec::new()),
538 stats: std::sync::Mutex::new(PromotionStats::default()),
539 _background: None,
540 });
541
542 promoter
543 }
544
545 pub fn insert_hot(&self, key: K, value: V) -> Option<V> {
547 self.hot_buffer.insert(key, value)
548 }
549
550 pub fn get(&self, key: &K) -> Option<V> {
552 self.hot_buffer.get(key)
553 }
554
555 pub fn force_promote(&self) -> Vec<(K, V)> {
557 let entries = self.hot_buffer.drain();
558
559 if !entries.is_empty() {
560 let mut stats = self.stats.lock().unwrap();
561 stats.promotion_count += 1;
562 stats.entries_promoted += entries.len() as u64;
563 stats.avg_batch_size = stats.entries_promoted as f64 / stats.promotion_count as f64;
564 }
565
566 entries
567 }
568
569 pub fn stats(&self) -> PromotionStats {
571 self.stats.lock().unwrap().clone()
572 }
573
574 pub fn hot_size(&self) -> usize {
576 self.hot_buffer.len()
577 }
578}
579
580pub struct DeferredIndex<K, V, Cold>
586where
587 K: Ord + Clone + Default,
588 V: Clone + Default,
589{
590 hot: StratifiedSkipList<K, V>,
592 cold: Cold,
594 promotion_threshold: usize,
596 insert_count: AtomicUsize,
598}
599
600impl<K, V, Cold> DeferredIndex<K, V, Cold>
601where
602 K: Ord + Clone + Default,
603 V: Clone + Default,
604 Cold: ColdStorage<K, V>,
605{
606 pub fn new(cold: Cold, promotion_threshold: usize) -> Self {
608 Self {
609 hot: StratifiedSkipList::with_capacity(promotion_threshold),
610 cold,
611 promotion_threshold,
612 insert_count: AtomicUsize::new(0),
613 }
614 }
615
616 pub fn insert(&self, key: K, value: V) -> Option<V> {
618 let count = self.insert_count.fetch_add(1, AtomicOrdering::Relaxed);
619
620 if count >= self.promotion_threshold {
622 self.promote();
623 }
624
625 self.hot.insert(key, value)
626 }
627
628 pub fn get(&self, key: &K) -> Option<V> {
630 if let Some(value) = self.hot.get(key) {
632 return Some(value);
633 }
634
635 self.cold.get(key)
637 }
638
639 pub fn promote(&self) {
641 let entries = self.hot.drain();
642
643 if !entries.is_empty() {
644 self.cold.insert_batch(entries);
645 self.insert_count.store(0, AtomicOrdering::Release);
646 }
647 }
648
649 pub fn hot_size(&self) -> usize {
651 self.hot.len()
652 }
653}
654
655pub trait ColdStorage<K, V>: Send + Sync {
657 fn get(&self, key: &K) -> Option<V>;
659
660 fn insert_batch(&self, entries: Vec<(K, V)>);
662}
663
664pub struct HashMapCold<K, V> {
670 data: parking_lot::RwLock<std::collections::HashMap<K, V>>,
671}
672
673impl<K, V> HashMapCold<K, V>
674where
675 K: Eq + std::hash::Hash + Clone,
676{
677 pub fn new() -> Self {
679 Self {
680 data: parking_lot::RwLock::new(std::collections::HashMap::new()),
681 }
682 }
683}
684
685impl<K, V> Default for HashMapCold<K, V>
686where
687 K: Eq + std::hash::Hash + Clone,
688{
689 fn default() -> Self {
690 Self::new()
691 }
692}
693
694impl<K, V> ColdStorage<K, V> for HashMapCold<K, V>
695where
696 K: Eq + std::hash::Hash + Clone + Send + Sync,
697 V: Clone + Send + Sync,
698{
699 fn get(&self, key: &K) -> Option<V> {
700 self.data.read().get(key).cloned()
701 }
702
703 fn insert_batch(&self, entries: Vec<(K, V)>) {
704 let mut data = self.data.write();
705 for (k, v) in entries {
706 data.insert(k, v);
707 }
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use std::sync::Arc;
715 use std::thread;
716
717 #[test]
718 fn test_skiplist_basic() {
719 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
720
721 assert!(list.insert(1, "one".to_string()).is_none());
722 assert!(list.insert(2, "two".to_string()).is_none());
723 assert!(list.insert(3, "three".to_string()).is_none());
724
725 assert_eq!(list.len(), 3);
726 assert_eq!(list.get(&1), Some("one".to_string()));
727 assert_eq!(list.get(&2), Some("two".to_string()));
728 assert_eq!(list.get(&3), Some("three".to_string()));
729 assert_eq!(list.get(&4), None);
730 }
731
732 #[test]
733 fn test_skiplist_update() {
734 let list: StratifiedSkipList<i32, String> = StratifiedSkipList::new();
735
736 assert!(list.insert(1, "one".to_string()).is_none());
737 assert_eq!(list.insert(1, "ONE".to_string()), Some("one".to_string()));
738
739 assert_eq!(list.len(), 1);
740 assert_eq!(list.get(&1), Some("ONE".to_string()));
741 }
742
743 #[test]
744 fn test_skiplist_concurrent() {
745 let list = Arc::new(StratifiedSkipList::<i32, i32>::with_capacity(100000));
746 let mut handles = vec![];
747
748 for t in 0..4 {
749 let list_clone = list.clone();
750 handles.push(thread::spawn(move || {
751 for i in 0..1000 {
752 let key = t * 1000 + i;
753 list_clone.insert(key, key * 2);
754 }
755 }));
756 }
757
758 for handle in handles {
759 handle.join().unwrap();
760 }
761
762 assert_eq!(list.len(), 4000);
763
764 assert_eq!(list.get(&0), Some(0));
766 assert_eq!(list.get(&1000), Some(2000));
767 assert_eq!(list.get(&2000), Some(4000));
768 }
769
770 #[test]
771 fn test_skiplist_drain() {
772 let list: StratifiedSkipList<i32, i32> = StratifiedSkipList::new();
773
774 for i in 0..100 {
775 list.insert(i, i * 2);
776 }
777
778 let entries = list.drain();
779 assert_eq!(entries.len(), 100);
780
781 for (i, (k, v)) in entries.iter().enumerate() {
783 assert_eq!(*k, i as i32);
784 assert_eq!(*v, (i * 2) as i32);
785 }
786 }
787
788 #[test]
789 fn test_batch_promoter() {
790 let promoter = BatchPromoter::<i32, i32>::new(100);
791
792 for i in 0..50 {
793 promoter.insert_hot(i, i * 2);
794 }
795
796 assert_eq!(promoter.hot_size(), 50);
797 assert_eq!(promoter.get(&10), Some(20));
798
799 let promoted = promoter.force_promote();
800 assert_eq!(promoted.len(), 50);
801
802 let stats = promoter.stats();
803 assert_eq!(stats.promotion_count, 1);
804 assert_eq!(stats.entries_promoted, 50);
805 }
806
807 #[test]
808 fn test_deferred_index() {
809 let cold = HashMapCold::<i32, i32>::new();
810 let index = DeferredIndex::new(cold, 10);
811
812 for i in 0..5 {
814 index.insert(i, i * 10);
815 }
816
817 assert_eq!(index.get(&3), Some(30));
819
820 for i in 5..15 {
822 index.insert(i, i * 10);
823 }
824
825 assert_eq!(index.get(&0), Some(0)); assert_eq!(index.get(&12), Some(120)); }
829
830 #[test]
831 fn test_hot_key_absorption() {
832 let list: StratifiedSkipList<String, i32> = StratifiedSkipList::new();
833
834 for _ in 0..100 {
836 list.insert("hot_key".to_string(), 42);
837 }
838
839 assert_eq!(list.len(), 1);
841 assert_eq!(list.get(&"hot_key".to_string()), Some(42));
842 }
843}