Skip to main content

rivven_core/
concurrent.rs

1//! Lock-Free Concurrent Data Structures
2//!
3//! High-performance concurrent data structures optimized for streaming workloads:
4//!
5//! - **LockFreeQueue**: MPMC queue with bounded/unbounded variants
6//! - **AppendOnlyLog**: Lock-free append-only log for message batching
7//! - **ConcurrentHashMap**: Read-optimized concurrent map with minimal locking
8//! - **SkipList**: Lock-free ordered map for offset indexing
9//!
10//! # Design Principles
11//!
12//! 1. **Minimize Contention**: Use atomic operations over locks where possible
13//! 2. **Cache-Friendly**: Align structures to cache lines to prevent false sharing
14//! 3. **Memory Efficient**: Pool allocations to reduce GC pressure
15//! 4. **Backpressure-Aware**: Bounded structures with overflow policies
16//!
17//! # Architecture
18//!
19//! ```text
20//! ┌─────────────────────────────────────────────────────────────────────────┐
21//! │                    Concurrent Data Structures                            │
22//! ├─────────────────────────────────────────────────────────────────────────┤
23//! │                                                                          │
24//! │  ┌────────────────────┐     ┌────────────────────┐                      │
25//! │  │   LockFreeQueue    │     │   AppendOnlyLog    │                      │
26//! │  │  ┌──┬──┬──┬──┬──┐  │     │  ┌────────────────┐│                      │
27//! │  │  │H │  │  │  │T │  │     │  │ Segment 0      ││                      │
28//! │  │  └──┴──┴──┴──┴──┘  │     │  │ Segment 1      ││                      │
29//! │  │  MPMC lock-free    │     │  │ Segment N      ││                      │
30//! │  └────────────────────┘     │  └────────────────┘│                      │
31//! │                             │  Lock-free append  │                      │
32//! │  ┌────────────────────┐     └────────────────────┘                      │
33//! │  │   ConcurrentMap    │                                                 │
34//! │  │  ┌──┬──┬──┬──┬──┐  │     ┌────────────────────┐                      │
35//! │  │  │S0│S1│S2│S3│SN│  │     │     SkipList       │                      │
36//! │  │  └──┴──┴──┴──┴──┘  │     │  Level 3: ──●──────│                      │
37//! │  │  Sharded for perf  │     │  Level 2: ──●──●───│                      │
38//! │  └────────────────────┘     │  Level 1: ●─●─●─●──│                      │
39//! │                             │  O(log n) lookup   │                      │
40//! │                             └────────────────────┘                      │
41//! └─────────────────────────────────────────────────────────────────────────┘
42//! ```
43
44use bytes::Bytes;
45use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError, TrySendError};
46use parking_lot::RwLock;
47use std::cell::UnsafeCell;
48use std::collections::HashMap;
49use std::hash::{Hash, Hasher};
50use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52
53// ============================================================================
54// Cache Line Alignment
55// ============================================================================
56
57/// Pad a value to cache line size to prevent false sharing
58#[repr(C, align(64))]
59pub struct CacheAligned<T>(pub T);
60
61impl<T> std::ops::Deref for CacheAligned<T> {
62    type Target = T;
63    fn deref(&self) -> &T {
64        &self.0
65    }
66}
67
68impl<T> std::ops::DerefMut for CacheAligned<T> {
69    fn deref_mut(&mut self) -> &mut T {
70        &mut self.0
71    }
72}
73
74// ============================================================================
75// Lock-Free MPMC Queue
76// ============================================================================
77
78/// A high-performance bounded MPMC queue using crossbeam-channel
79///
80/// This queue is optimized for producer-consumer patterns common in streaming:
81/// - Network receive -> Message processing
82/// - Message batching -> Disk write
83/// - Raft log append -> Replication
84pub struct LockFreeQueue<T> {
85    sender: Sender<T>,
86    receiver: Receiver<T>,
87    capacity: usize,
88    /// Number of items currently in queue (approximate)
89    len: AtomicUsize,
90    /// Total items ever enqueued
91    total_enqueued: AtomicU64,
92    /// Total items ever dequeued
93    total_dequeued: AtomicU64,
94    /// Number of times enqueue was blocked (backpressure)
95    blocked_enqueues: AtomicU64,
96}
97
98impl<T> LockFreeQueue<T> {
99    /// Create a new bounded queue
100    pub fn bounded(capacity: usize) -> Self {
101        let (sender, receiver) = bounded(capacity);
102        Self {
103            sender,
104            receiver,
105            capacity,
106            len: AtomicUsize::new(0),
107            total_enqueued: AtomicU64::new(0),
108            total_dequeued: AtomicU64::new(0),
109            blocked_enqueues: AtomicU64::new(0),
110        }
111    }
112
113    /// Create a new unbounded queue (use with caution)
114    pub fn unbounded() -> Self {
115        let (sender, receiver) = unbounded();
116        Self {
117            sender,
118            receiver,
119            capacity: usize::MAX,
120            len: AtomicUsize::new(0),
121            total_enqueued: AtomicU64::new(0),
122            total_dequeued: AtomicU64::new(0),
123            blocked_enqueues: AtomicU64::new(0),
124        }
125    }
126
127    /// Try to enqueue an item without blocking
128    pub fn try_push(&self, item: T) -> Result<(), T> {
129        match self.sender.try_send(item) {
130            Ok(()) => {
131                self.len.fetch_add(1, Ordering::Relaxed);
132                self.total_enqueued.fetch_add(1, Ordering::Relaxed);
133                Ok(())
134            }
135            Err(TrySendError::Full(item)) => {
136                self.blocked_enqueues.fetch_add(1, Ordering::Relaxed);
137                Err(item)
138            }
139            Err(TrySendError::Disconnected(item)) => Err(item),
140        }
141    }
142
143    /// Blocking enqueue
144    pub fn push(&self, item: T) -> Result<(), T> {
145        match self.sender.send(item) {
146            Ok(()) => {
147                self.len.fetch_add(1, Ordering::Relaxed);
148                self.total_enqueued.fetch_add(1, Ordering::Relaxed);
149                Ok(())
150            }
151            Err(e) => Err(e.0),
152        }
153    }
154
155    /// Try to dequeue an item without blocking
156    pub fn try_pop(&self) -> Option<T> {
157        match self.receiver.try_recv() {
158            Ok(item) => {
159                self.len.fetch_sub(1, Ordering::Relaxed);
160                self.total_dequeued.fetch_add(1, Ordering::Relaxed);
161                Some(item)
162            }
163            Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
164        }
165    }
166
167    /// Blocking dequeue
168    pub fn pop(&self) -> Option<T> {
169        match self.receiver.recv() {
170            Ok(item) => {
171                self.len.fetch_sub(1, Ordering::Relaxed);
172                self.total_dequeued.fetch_add(1, Ordering::Relaxed);
173                Some(item)
174            }
175            Err(_) => None,
176        }
177    }
178
179    /// Try to dequeue up to `max` items
180    pub fn pop_batch(&self, max: usize) -> Vec<T> {
181        let mut batch = Vec::with_capacity(max.min(64));
182        for _ in 0..max {
183            match self.receiver.try_recv() {
184                Ok(item) => {
185                    batch.push(item);
186                }
187                Err(_) => break,
188            }
189        }
190        let count = batch.len();
191        if count > 0 {
192            self.len.fetch_sub(count, Ordering::Relaxed);
193            self.total_dequeued
194                .fetch_add(count as u64, Ordering::Relaxed);
195        }
196        batch
197    }
198
199    /// Get approximate queue length
200    pub fn len(&self) -> usize {
201        self.len.load(Ordering::Relaxed)
202    }
203
204    /// Check if queue is empty
205    pub fn is_empty(&self) -> bool {
206        self.len() == 0
207    }
208
209    /// Get queue capacity
210    pub fn capacity(&self) -> usize {
211        self.capacity
212    }
213
214    /// Get fill percentage (0.0 - 1.0)
215    pub fn fill_ratio(&self) -> f64 {
216        if self.capacity == usize::MAX {
217            0.0
218        } else {
219            self.len() as f64 / self.capacity as f64
220        }
221    }
222
223    /// Get statistics
224    pub fn stats(&self) -> QueueStats {
225        QueueStats {
226            len: self.len(),
227            capacity: self.capacity,
228            total_enqueued: self.total_enqueued.load(Ordering::Relaxed),
229            total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
230            blocked_enqueues: self.blocked_enqueues.load(Ordering::Relaxed),
231        }
232    }
233}
234
235/// Queue statistics
236#[derive(Debug, Clone)]
237pub struct QueueStats {
238    pub len: usize,
239    pub capacity: usize,
240    pub total_enqueued: u64,
241    pub total_dequeued: u64,
242    pub blocked_enqueues: u64,
243}
244
245// ============================================================================
246// Lock-Free Append-Only Log
247// ============================================================================
248
249/// Configuration for the append-only log
250#[derive(Debug, Clone)]
251pub struct AppendLogConfig {
252    /// Size of each segment in bytes
253    pub segment_size: usize,
254    /// Maximum number of segments to keep in memory
255    pub max_segments: usize,
256    /// Whether to preallocate segments
257    pub preallocate: bool,
258}
259
260impl Default for AppendLogConfig {
261    fn default() -> Self {
262        Self {
263            segment_size: 64 * 1024 * 1024, // 64 MB
264            max_segments: 4,
265            preallocate: true,
266        }
267    }
268}
269
270/// A segment in the append-only log
271struct LogSegment {
272    /// Segment data
273    data: Vec<u8>,
274    /// Current write position
275    write_pos: AtomicUsize,
276    /// Segment base offset
277    base_offset: u64,
278    /// Is this segment sealed (no more writes)?
279    sealed: AtomicBool,
280}
281
282impl LogSegment {
283    fn new(base_offset: u64, capacity: usize, preallocate: bool) -> Self {
284        let mut data = if preallocate {
285            vec![0u8; capacity]
286        } else {
287            Vec::with_capacity(capacity)
288        };
289
290        if !preallocate {
291            // Set len to 0 but capacity is reserved
292            data.clear();
293        }
294
295        Self {
296            data,
297            write_pos: AtomicUsize::new(0),
298            base_offset,
299            sealed: AtomicBool::new(false),
300        }
301    }
302
303    /// Try to append data to this segment
304    /// Returns (position, entry_offset) on success
305    fn try_append(&self, data: &[u8]) -> Option<(usize, u64)> {
306        if self.sealed.load(Ordering::Acquire) {
307            return None;
308        }
309
310        let needed = 4 + data.len(); // 4 bytes for length prefix
311
312        // CAS loop to reserve space
313        loop {
314            let current_pos = self.write_pos.load(Ordering::Acquire);
315            let new_pos = current_pos + needed;
316
317            if new_pos > self.data.len() {
318                // Segment is full, seal it
319                self.sealed.store(true, Ordering::Release);
320                return None;
321            }
322
323            // Try to reserve space
324            match self.write_pos.compare_exchange_weak(
325                current_pos,
326                new_pos,
327                Ordering::AcqRel,
328                Ordering::Acquire,
329            ) {
330                Ok(_) => {
331                    // Space reserved, write data
332                    // SAFETY: We have exclusive access to [current_pos..new_pos]
333                    let ptr = self.data.as_ptr() as *mut u8;
334                    unsafe {
335                        // Write length prefix (big-endian)
336                        let len = data.len() as u32;
337                        let len_bytes = len.to_be_bytes();
338                        std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), ptr.add(current_pos), 4);
339
340                        // Write data
341                        std::ptr::copy_nonoverlapping(
342                            data.as_ptr(),
343                            ptr.add(current_pos + 4),
344                            data.len(),
345                        );
346                    }
347
348                    let offset = self.base_offset + current_pos as u64;
349                    return Some((current_pos, offset));
350                }
351                Err(_) => {
352                    // CAS failed, retry
353                    std::hint::spin_loop();
354                }
355            }
356        }
357    }
358
359    /// Read an entry at the given position
360    fn read(&self, position: usize) -> Option<&[u8]> {
361        let committed = self.write_pos.load(Ordering::Acquire);
362
363        if position + 4 > committed {
364            return None;
365        }
366
367        // Read length prefix
368        let len_bytes: [u8; 4] = self.data[position..position + 4].try_into().ok()?;
369        let len = u32::from_be_bytes(len_bytes) as usize;
370
371        if position + 4 + len > committed {
372            return None;
373        }
374
375        Some(&self.data[position + 4..position + 4 + len])
376    }
377
378    /// Get committed size
379    fn committed_size(&self) -> usize {
380        self.write_pos.load(Ordering::Acquire)
381    }
382
383    /// Check if segment is sealed
384    fn is_sealed(&self) -> bool {
385        self.sealed.load(Ordering::Acquire)
386    }
387}
388
389/// A lock-free append-only log for high-throughput message storage
390///
391/// Design goals:
392/// - Lock-free appends from multiple producers
393/// - Sequential reads optimized for batching
394/// - Memory-efficient with segment rotation
395pub struct AppendOnlyLog {
396    /// Configuration
397    config: AppendLogConfig,
398    /// Active segments
399    segments: RwLock<Vec<Arc<LogSegment>>>,
400    /// Total bytes written
401    total_bytes: AtomicU64,
402    /// Total entries written (also serves as global offset counter)
403    total_entries: AtomicU64,
404}
405
406impl AppendOnlyLog {
407    /// Create a new append-only log
408    pub fn new(config: AppendLogConfig) -> Self {
409        let initial_segment = Arc::new(LogSegment::new(0, config.segment_size, config.preallocate));
410
411        Self {
412            config,
413            segments: RwLock::new(vec![initial_segment]),
414            total_bytes: AtomicU64::new(0),
415            total_entries: AtomicU64::new(0),
416        }
417    }
418
419    /// Append data to the log, returns the offset
420    pub fn append(&self, data: &[u8]) -> u64 {
421        loop {
422            // Try to append to current segment
423            {
424                let segments = self.segments.read();
425                if let Some(segment) = segments.last() {
426                    if let Some((_, offset)) = segment.try_append(data) {
427                        self.total_bytes
428                            .fetch_add(data.len() as u64, Ordering::Relaxed);
429                        self.total_entries.fetch_add(1, Ordering::Relaxed);
430                        return offset;
431                    }
432                }
433            }
434
435            // Segment is full, need to create a new one
436            self.rotate_segment();
437        }
438    }
439
440    /// Append a batch of entries, returns vec of offsets
441    pub fn append_batch(&self, entries: &[&[u8]]) -> Vec<u64> {
442        let mut offsets = Vec::with_capacity(entries.len());
443
444        for data in entries {
445            offsets.push(self.append(data));
446        }
447
448        offsets
449    }
450
451    /// Rotate to a new segment
452    fn rotate_segment(&self) {
453        let mut segments = self.segments.write();
454
455        // Double-check the last segment is actually sealed
456        if let Some(last) = segments.last() {
457            if !last.is_sealed() {
458                // Another thread may have already rotated
459                return;
460            }
461        }
462
463        // Calculate next base offset
464        let next_base = segments
465            .last()
466            .map(|s| s.base_offset + s.committed_size() as u64)
467            .unwrap_or(0);
468
469        // Create new segment
470        let new_segment = Arc::new(LogSegment::new(
471            next_base,
472            self.config.segment_size,
473            self.config.preallocate,
474        ));
475
476        segments.push(new_segment);
477
478        // Remove old segments if we have too many
479        while segments.len() > self.config.max_segments {
480            segments.remove(0);
481        }
482    }
483
484    /// Read entries starting from an offset
485    pub fn read(&self, start_offset: u64, max_entries: usize) -> Vec<Bytes> {
486        let segments = self.segments.read();
487        let mut entries = Vec::with_capacity(max_entries);
488
489        // Find the segment containing start_offset
490        for segment in segments.iter() {
491            if segment.base_offset > start_offset {
492                continue;
493            }
494
495            let relative_pos = (start_offset - segment.base_offset) as usize;
496            let mut pos = relative_pos;
497
498            while entries.len() < max_entries {
499                match segment.read(pos) {
500                    Some(data) => {
501                        entries.push(Bytes::copy_from_slice(data));
502                        pos += 4 + data.len(); // Move to next entry
503                    }
504                    None => break,
505                }
506            }
507        }
508
509        entries
510    }
511
512    /// Get total bytes written
513    pub fn total_bytes(&self) -> u64 {
514        self.total_bytes.load(Ordering::Relaxed)
515    }
516
517    /// Get total entries written
518    pub fn total_entries(&self) -> u64 {
519        self.total_entries.load(Ordering::Relaxed)
520    }
521
522    /// Get current end offset
523    pub fn end_offset(&self) -> u64 {
524        let segments = self.segments.read();
525        segments
526            .last()
527            .map(|s| s.base_offset + s.committed_size() as u64)
528            .unwrap_or(0)
529    }
530
531    /// Get number of segments
532    pub fn segment_count(&self) -> usize {
533        self.segments.read().len()
534    }
535}
536
537// ============================================================================
538// Sharded Concurrent HashMap
539// ============================================================================
540
541/// Number of shards (should be power of 2)
542const SHARD_COUNT: usize = 64;
543
544/// A cache-friendly sharded concurrent hash map
545///
546/// Uses multiple shards to reduce contention. Each shard has its own lock,
547/// so operations on different keys in different shards can proceed in parallel.
548pub struct ConcurrentHashMap<K, V> {
549    shards: [CacheAligned<RwLock<HashMap<K, V>>>; SHARD_COUNT],
550    len: AtomicUsize,
551}
552
553impl<K: Hash + Eq + Clone, V: Clone> ConcurrentHashMap<K, V> {
554    /// Create a new concurrent hash map
555    pub fn new() -> Self {
556        // Initialize all shards
557        let shards = std::array::from_fn(|_| CacheAligned(RwLock::new(HashMap::new())));
558
559        Self {
560            shards,
561            len: AtomicUsize::new(0),
562        }
563    }
564
565    /// Get the shard index for a key
566    fn shard_index(&self, key: &K) -> usize {
567        let mut hasher = std::collections::hash_map::DefaultHasher::new();
568        key.hash(&mut hasher);
569        hasher.finish() as usize % SHARD_COUNT
570    }
571
572    /// Insert a key-value pair
573    pub fn insert(&self, key: K, value: V) -> Option<V> {
574        let shard_idx = self.shard_index(&key);
575        let mut shard = self.shards[shard_idx].write();
576
577        let old = shard.insert(key, value);
578        if old.is_none() {
579            self.len.fetch_add(1, Ordering::Relaxed);
580        }
581        old
582    }
583
584    /// Get a value by key
585    pub fn get(&self, key: &K) -> Option<V> {
586        let shard_idx = self.shard_index(key);
587        let shard = self.shards[shard_idx].read();
588        shard.get(key).cloned()
589    }
590
591    /// Check if key exists
592    pub fn contains_key(&self, key: &K) -> bool {
593        let shard_idx = self.shard_index(key);
594        let shard = self.shards[shard_idx].read();
595        shard.contains_key(key)
596    }
597
598    /// Remove a key
599    pub fn remove(&self, key: &K) -> Option<V> {
600        let shard_idx = self.shard_index(key);
601        let mut shard = self.shards[shard_idx].write();
602
603        let removed = shard.remove(key);
604        if removed.is_some() {
605            self.len.fetch_sub(1, Ordering::Relaxed);
606        }
607        removed
608    }
609
610    /// Get approximate length
611    pub fn len(&self) -> usize {
612        self.len.load(Ordering::Relaxed)
613    }
614
615    /// Check if empty
616    pub fn is_empty(&self) -> bool {
617        self.len() == 0
618    }
619
620    /// Apply a function to a value
621    pub fn update<F>(&self, key: &K, f: F) -> Option<V>
622    where
623        F: FnOnce(&mut V),
624    {
625        let shard_idx = self.shard_index(key);
626        let mut shard = self.shards[shard_idx].write();
627
628        if let Some(value) = shard.get_mut(key) {
629            f(value);
630            Some(value.clone())
631        } else {
632            None
633        }
634    }
635
636    /// Get or insert with a default value
637    pub fn get_or_insert(&self, key: K, default: V) -> V {
638        let shard_idx = self.shard_index(&key);
639        let mut shard = self.shards[shard_idx].write();
640
641        if let Some(value) = shard.get(&key) {
642            value.clone()
643        } else {
644            self.len.fetch_add(1, Ordering::Relaxed);
645            shard.insert(key, default.clone());
646            default
647        }
648    }
649
650    /// Get or insert with a closure
651    pub fn get_or_insert_with<F>(&self, key: K, f: F) -> V
652    where
653        F: FnOnce() -> V,
654    {
655        let shard_idx = self.shard_index(&key);
656        let mut shard = self.shards[shard_idx].write();
657
658        if let Some(value) = shard.get(&key) {
659            value.clone()
660        } else {
661            let value = f();
662            self.len.fetch_add(1, Ordering::Relaxed);
663            shard.insert(key, value.clone());
664            value
665        }
666    }
667
668    /// Iterate over all entries (snapshot)
669    pub fn snapshot(&self) -> Vec<(K, V)> {
670        let mut entries = Vec::new();
671
672        for shard in &self.shards {
673            let shard = shard.read();
674            for (k, v) in shard.iter() {
675                entries.push((k.clone(), v.clone()));
676            }
677        }
678
679        entries
680    }
681
682    /// Clear all entries
683    pub fn clear(&self) {
684        for shard in &self.shards {
685            shard.write().clear();
686        }
687        self.len.store(0, Ordering::Relaxed);
688    }
689}
690
691impl<K: Hash + Eq + Clone, V: Clone> Default for ConcurrentHashMap<K, V> {
692    fn default() -> Self {
693        Self::new()
694    }
695}
696
697// ============================================================================
698// Lock-Free Skip List for Offset Indexing
699// ============================================================================
700
701/// Maximum height for skip list nodes
702const MAX_HEIGHT: usize = 32;
703
704/// A skip list node
705struct SkipNode<K, V> {
706    key: K,
707    value: UnsafeCell<V>,
708    /// Forward pointers for each level
709    forward: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
710    /// Number of levels this node participates in (used for probabilistic balancing)
711    #[allow(dead_code)]
712    height: usize,
713}
714
715impl<K, V> SkipNode<K, V> {
716    fn new(key: K, value: V, height: usize) -> *mut Self {
717        let forward = std::array::from_fn(|_| AtomicPtr::new(std::ptr::null_mut()));
718
719        let node = Box::new(Self {
720            key,
721            value: UnsafeCell::new(value),
722            forward,
723            height,
724        });
725
726        Box::into_raw(node)
727    }
728}
729
730/// A concurrent skip list optimized for offset lookups
731///
732/// Provides O(log n) lookups for finding messages by offset, which is
733/// critical for efficient consumer fetching.
734pub struct ConcurrentSkipList<K: Ord + Clone, V: Clone> {
735    /// Sentinel head node
736    head: *mut SkipNode<K, V>,
737    /// Current maximum height
738    max_level: AtomicUsize,
739    /// Number of elements
740    len: AtomicUsize,
741    /// Random state for level generation
742    rand_state: AtomicU64,
743}
744
745// SAFETY: The skip list is thread-safe through atomic operations
746unsafe impl<K: Ord + Clone + Send, V: Clone + Send> Send for ConcurrentSkipList<K, V> {}
747unsafe impl<K: Ord + Clone + Sync, V: Clone + Sync> Sync for ConcurrentSkipList<K, V> {}
748
749impl<K: Ord + Clone + Default, V: Clone + Default> ConcurrentSkipList<K, V> {
750    /// Create a new concurrent skip list
751    pub fn new() -> Self {
752        // Create head sentinel
753        let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
754
755        Self {
756            head,
757            max_level: AtomicUsize::new(1),
758            len: AtomicUsize::new(0),
759            rand_state: AtomicU64::new(0x12345678),
760        }
761    }
762
763    /// Generate a random level for a new node
764    fn random_level(&self) -> usize {
765        // XORShift random number generation
766        let mut level = 1;
767        let mut x = self.rand_state.load(Ordering::Relaxed);
768
769        loop {
770            x ^= x << 13;
771            x ^= x >> 7;
772            x ^= x << 17;
773            self.rand_state.store(x, Ordering::Relaxed);
774
775            if x & 1 == 0 || level >= MAX_HEIGHT {
776                break;
777            }
778            level += 1;
779        }
780
781        level
782    }
783
784    /// Insert a key-value pair
785    pub fn insert(&self, key: K, value: V) {
786        let height = self.random_level();
787        let new_node = SkipNode::new(key.clone(), value, height);
788
789        // Update max level if needed
790        let mut current_max = self.max_level.load(Ordering::Relaxed);
791        while height > current_max {
792            match self.max_level.compare_exchange_weak(
793                current_max,
794                height,
795                Ordering::AcqRel,
796                Ordering::Relaxed,
797            ) {
798                Ok(_) => break,
799                Err(m) => current_max = m,
800            }
801        }
802
803        // Find position and insert
804        let mut update = [std::ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
805        let mut current = self.head;
806
807        #[allow(clippy::needless_range_loop)]
808        // Intentional: unsafe pointer array requires explicit indexing
809        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
810            // SAFETY: We traverse from head which is always valid. Each `forward` pointer
811            // is either null or points to a valid SkipNode allocated by `insert`.
812            // Acquire ordering ensures we see the complete node data.
813            unsafe {
814                loop {
815                    let next = (*current).forward[i].load(Ordering::Acquire);
816                    if next.is_null() || (*next).key >= key {
817                        break;
818                    }
819                    current = next;
820                }
821                update[i] = current;
822            }
823        }
824
825        // Insert node at each level
826        #[allow(clippy::needless_range_loop)]
827        for i in 0..height {
828            // SAFETY: `pred` is either `self.head` (always valid) or a node from `update`
829            // which was found during traversal. `new_node` was just allocated above.
830            // Release ordering ensures the new node's data is visible before the pointer.
831            unsafe {
832                let pred = if update[i].is_null() {
833                    self.head
834                } else {
835                    update[i]
836                };
837                let next = (*pred).forward[i].load(Ordering::Acquire);
838                (*new_node).forward[i].store(next, Ordering::Release);
839                (*pred).forward[i].store(new_node, Ordering::Release);
840            }
841        }
842
843        self.len.fetch_add(1, Ordering::Relaxed);
844    }
845
846    /// Find a value by key
847    pub fn get(&self, key: &K) -> Option<V> {
848        let mut current = self.head;
849
850        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
851            // SAFETY: Traversal starts from `self.head` which is always valid.
852            // All forward pointers are either null or point to valid SkipNodes.
853            // Acquire ordering ensures we see the node's complete data.
854            unsafe {
855                loop {
856                    let next = (*current).forward[i].load(Ordering::Acquire);
857                    if next.is_null() {
858                        break;
859                    }
860                    if (*next).key == *key {
861                        return Some((*(*next).value.get()).clone());
862                    }
863                    if (*next).key > *key {
864                        break;
865                    }
866                    current = next;
867                }
868            }
869        }
870
871        None
872    }
873
874    /// Find the greatest key less than or equal to the given key
875    /// This is useful for finding the closest offset <= target
876    pub fn floor(&self, key: &K) -> Option<(K, V)> {
877        let mut current = self.head;
878        let mut result: Option<*mut SkipNode<K, V>> = None;
879
880        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
881            // SAFETY: Traversal starts from `self.head` which is always valid.
882            // All forward pointers are either null or point to valid SkipNodes.
883            // Acquire ordering ensures visibility of node data.
884            unsafe {
885                loop {
886                    let next = (*current).forward[i].load(Ordering::Acquire);
887                    if next.is_null() {
888                        break;
889                    }
890                    if (*next).key <= *key {
891                        result = Some(next);
892                        current = next;
893                    } else {
894                        break;
895                    }
896                }
897            }
898        }
899
900        // SAFETY: `node` was obtained from traversal and is a valid SkipNode pointer.
901        result.map(|node| unsafe { ((*node).key.clone(), (*(*node).value.get()).clone()) })
902    }
903
904    /// Find the smallest key greater than or equal to the given key
905    pub fn ceiling(&self, key: &K) -> Option<(K, V)> {
906        let mut current = self.head;
907
908        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
909            // SAFETY: Traversal starts from `self.head` which is always valid.
910            // All forward pointers are either null or point to valid SkipNodes.
911            unsafe {
912                loop {
913                    let next = (*current).forward[i].load(Ordering::Acquire);
914                    if next.is_null() || (*next).key >= *key {
915                        break;
916                    }
917                    current = next;
918                }
919            }
920        }
921
922        // SAFETY: `current` is a valid node from traversal (or head).
923        // The next pointer is either null or points to a valid SkipNode.
924        unsafe {
925            let next = (*current).forward[0].load(Ordering::Acquire);
926            if !next.is_null() {
927                Some(((*next).key.clone(), (*(*next).value.get()).clone()))
928            } else {
929                None
930            }
931        }
932    }
933
934    /// Get number of elements
935    pub fn len(&self) -> usize {
936        self.len.load(Ordering::Relaxed)
937    }
938
939    /// Check if empty
940    pub fn is_empty(&self) -> bool {
941        self.len() == 0
942    }
943
944    /// Get a range of entries
945    pub fn range(&self, start: &K, end: &K, limit: usize) -> Vec<(K, V)> {
946        let mut entries = Vec::with_capacity(limit.min(1000));
947        let mut current = self.head;
948
949        // Find start position
950        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
951            // SAFETY: Traversal starts from `self.head` which is always valid.
952            // All forward pointers are either null or point to valid SkipNodes.
953            unsafe {
954                loop {
955                    let next = (*current).forward[i].load(Ordering::Acquire);
956                    if next.is_null() || (*next).key >= *start {
957                        break;
958                    }
959                    current = next;
960                }
961            }
962        }
963
964        // Collect entries in range
965        // SAFETY: `current` is valid from traversal. We iterate level-0 forward pointers
966        // which are either null or point to valid SkipNodes allocated by `insert`.
967        unsafe {
968            let mut node = (*current).forward[0].load(Ordering::Acquire);
969            while !node.is_null() && entries.len() < limit {
970                if (*node).key > *end {
971                    break;
972                }
973                entries.push(((*node).key.clone(), (*(*node).value.get()).clone()));
974                node = (*node).forward[0].load(Ordering::Acquire);
975            }
976        }
977
978        entries
979    }
980}
981
982impl<K: Ord + Clone + Default, V: Clone + Default> Default for ConcurrentSkipList<K, V> {
983    fn default() -> Self {
984        Self::new()
985    }
986}
987
988impl<K: Ord + Clone, V: Clone> Drop for ConcurrentSkipList<K, V> {
989    fn drop(&mut self) {
990        // Free all nodes
991        let mut current = self.head;
992        // SAFETY: We have exclusive access via `&mut self`. All nodes were allocated
993        // by `insert` using `Box::into_raw`. We traverse level-0 to visit every node
994        // exactly once, converting them back to Box for proper deallocation.
995        unsafe {
996            while !current.is_null() {
997                let next = (*current).forward[0].load(Ordering::Relaxed);
998                drop(Box::from_raw(current));
999                current = next;
1000            }
1001        }
1002    }
1003}
1004
1005// ============================================================================
1006// Tests
1007// ============================================================================
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012    use std::sync::Arc;
1013    use std::thread;
1014
1015    #[test]
1016    fn test_lock_free_queue() {
1017        let queue = LockFreeQueue::<i32>::bounded(100);
1018
1019        assert!(queue.is_empty());
1020
1021        queue.push(1).unwrap();
1022        queue.push(2).unwrap();
1023        queue.push(3).unwrap();
1024
1025        assert_eq!(queue.len(), 3);
1026        assert_eq!(queue.pop(), Some(1));
1027        assert_eq!(queue.pop(), Some(2));
1028        assert_eq!(queue.pop(), Some(3));
1029        assert!(queue.is_empty());
1030    }
1031
1032    #[test]
1033    fn test_lock_free_queue_concurrent() {
1034        let queue = Arc::new(LockFreeQueue::<i32>::bounded(1000));
1035        let mut handles = vec![];
1036
1037        // Spawn producers
1038        for i in 0..4 {
1039            let q = queue.clone();
1040            handles.push(thread::spawn(move || {
1041                for j in 0..250 {
1042                    q.push(i * 250 + j).unwrap();
1043                }
1044            }));
1045        }
1046
1047        // Wait for producers
1048        for h in handles {
1049            h.join().unwrap();
1050        }
1051
1052        assert_eq!(queue.len(), 1000);
1053
1054        // Consume all
1055        let batch = queue.pop_batch(1000);
1056        assert_eq!(batch.len(), 1000);
1057    }
1058
1059    #[test]
1060    fn test_append_only_log() {
1061        let config = AppendLogConfig {
1062            segment_size: 1024,
1063            max_segments: 4,
1064            preallocate: true,
1065        };
1066        let log = AppendOnlyLog::new(config);
1067
1068        let offset1 = log.append(b"hello");
1069        let offset2 = log.append(b"world");
1070
1071        assert!(offset2 > offset1);
1072
1073        let entries = log.read(offset1, 10);
1074        assert_eq!(entries.len(), 2);
1075        assert_eq!(&entries[0][..], b"hello");
1076        assert_eq!(&entries[1][..], b"world");
1077    }
1078
1079    #[test]
1080    fn test_concurrent_hash_map() {
1081        let map = Arc::new(ConcurrentHashMap::<String, i32>::new());
1082        let mut handles = vec![];
1083
1084        // Concurrent inserts
1085        for i in 0..4 {
1086            let m = map.clone();
1087            handles.push(thread::spawn(move || {
1088                for j in 0..250 {
1089                    m.insert(format!("key-{}-{}", i, j), i * 250 + j);
1090                }
1091            }));
1092        }
1093
1094        for h in handles {
1095            h.join().unwrap();
1096        }
1097
1098        assert_eq!(map.len(), 1000);
1099
1100        // Verify reads
1101        assert_eq!(map.get(&"key-0-0".to_string()), Some(0));
1102        assert_eq!(map.get(&"key-3-249".to_string()), Some(999));
1103    }
1104
1105    #[test]
1106    fn test_skip_list() {
1107        let list = ConcurrentSkipList::<u64, String>::new();
1108
1109        list.insert(10, "ten".to_string());
1110        list.insert(20, "twenty".to_string());
1111        list.insert(5, "five".to_string());
1112        list.insert(15, "fifteen".to_string());
1113
1114        assert_eq!(list.len(), 4);
1115        assert_eq!(list.get(&10), Some("ten".to_string()));
1116        assert_eq!(list.get(&99), None);
1117
1118        // Floor test
1119        assert_eq!(list.floor(&12), Some((10, "ten".to_string())));
1120        assert_eq!(list.floor(&15), Some((15, "fifteen".to_string())));
1121
1122        // Ceiling test
1123        assert_eq!(list.ceiling(&12), Some((15, "fifteen".to_string())));
1124        assert_eq!(list.ceiling(&1), Some((5, "five".to_string())));
1125    }
1126
1127    #[test]
1128    fn test_skip_list_range() {
1129        let list = ConcurrentSkipList::<u64, String>::new();
1130
1131        for i in 0..100 {
1132            list.insert(i * 10, format!("value-{}", i));
1133        }
1134
1135        let range = list.range(&150, &350, 100);
1136        assert!(!range.is_empty());
1137
1138        // Should include 150, 160, ..., 350
1139        for (k, _) in &range {
1140            assert!(*k >= 150 && *k <= 350);
1141        }
1142    }
1143}