Skip to main content

rivven_core/
concurrent.rs

1//! Lock-Free Concurrent Data Structures
2//!
3//! High-performance concurrent data structures optimized for streaming workloads:
4//!
5//! - **LockFreeQueue**: MPMC queue with bounded/unbounded variants
6//! - **AppendOnlyLog**: Lock-free append-only log for message batching
7//! - **ConcurrentHashMap**: Read-optimized concurrent map with minimal locking
8//! - **SkipList**: Lock-free ordered map for offset indexing
9//!
10//! # Design Principles
11//!
12//! 1. **Minimize Contention**: Use atomic operations over locks where possible
13//! 2. **Cache-Friendly**: Align structures to cache lines to prevent false sharing
14//! 3. **Memory Efficient**: Pool allocations to reduce GC pressure
15//! 4. **Backpressure-Aware**: Bounded structures with overflow policies
16//!
17//! # Architecture
18//!
19//! ```text
20//! ┌─────────────────────────────────────────────────────────────────────────┐
21//! │                    Concurrent Data Structures                            │
22//! ├─────────────────────────────────────────────────────────────────────────┤
23//! │                                                                          │
24//! │  ┌────────────────────┐     ┌────────────────────┐                      │
25//! │  │   LockFreeQueue    │     │   AppendOnlyLog    │                      │
26//! │  │  ┌──┬──┬──┬──┬──┐  │     │  ┌────────────────┐│                      │
27//! │  │  │H │  │  │  │T │  │     │  │ Segment 0      ││                      │
28//! │  │  └──┴──┴──┴──┴──┘  │     │  │ Segment 1      ││                      │
29//! │  │  MPMC lock-free    │     │  │ Segment N      ││                      │
30//! │  └────────────────────┘     │  └────────────────┘│                      │
31//! │                             │  Lock-free append  │                      │
32//! │  ┌────────────────────┐     └────────────────────┘                      │
33//! │  │   ConcurrentMap    │                                                 │
34//! │  │  ┌──┬──┬──┬──┬──┐  │     ┌────────────────────┐                      │
35//! │  │  │S0│S1│S2│S3│SN│  │     │     SkipList       │                      │
36//! │  │  └──┴──┴──┴──┴──┘  │     │  Level 3: ──●──────│                      │
37//! │  │  Sharded for perf  │     │  Level 2: ──●──●───│                      │
38//! │  └────────────────────┘     │  Level 1: ●─●─●─●──│                      │
39//! │                             │  O(log n) lookup   │                      │
40//! │                             └────────────────────┘                      │
41//! └─────────────────────────────────────────────────────────────────────────┘
42//! ```
43
44use bytes::Bytes;
45use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError, TrySendError};
46use parking_lot::RwLock;
47use std::cell::UnsafeCell;
48use std::collections::HashMap;
49use std::hash::{Hash, Hasher};
50use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52
53// ============================================================================
54// Cache Line Alignment
55// ============================================================================
56
57/// Pad a value to cache line size to prevent false sharing
58#[repr(C, align(64))]
59pub struct CacheAligned<T>(pub T);
60
61impl<T> std::ops::Deref for CacheAligned<T> {
62    type Target = T;
63    fn deref(&self) -> &T {
64        &self.0
65    }
66}
67
68impl<T> std::ops::DerefMut for CacheAligned<T> {
69    fn deref_mut(&mut self) -> &mut T {
70        &mut self.0
71    }
72}
73
74// ============================================================================
75// Lock-Free MPMC Queue
76// ============================================================================
77
78/// A high-performance bounded MPMC queue using crossbeam-channel
79///
80/// This queue is optimized for producer-consumer patterns common in streaming:
81/// - Network receive -> Message processing
82/// - Message batching -> Disk write
83/// - Raft log append -> Replication
84pub struct LockFreeQueue<T> {
85    sender: Sender<T>,
86    receiver: Receiver<T>,
87    capacity: usize,
88    /// Number of items currently in queue (approximate)
89    len: AtomicUsize,
90    /// Total items ever enqueued
91    total_enqueued: AtomicU64,
92    /// Total items ever dequeued
93    total_dequeued: AtomicU64,
94    /// Number of times enqueue was blocked (backpressure)
95    blocked_enqueues: AtomicU64,
96}
97
98impl<T> LockFreeQueue<T> {
99    /// Create a new bounded queue
100    pub fn bounded(capacity: usize) -> Self {
101        let (sender, receiver) = bounded(capacity);
102        Self {
103            sender,
104            receiver,
105            capacity,
106            len: AtomicUsize::new(0),
107            total_enqueued: AtomicU64::new(0),
108            total_dequeued: AtomicU64::new(0),
109            blocked_enqueues: AtomicU64::new(0),
110        }
111    }
112
113    /// Create a new unbounded queue (use with caution)
114    pub fn unbounded() -> Self {
115        let (sender, receiver) = unbounded();
116        Self {
117            sender,
118            receiver,
119            capacity: usize::MAX,
120            len: AtomicUsize::new(0),
121            total_enqueued: AtomicU64::new(0),
122            total_dequeued: AtomicU64::new(0),
123            blocked_enqueues: AtomicU64::new(0),
124        }
125    }
126
127    /// Try to enqueue an item without blocking
128    pub fn try_push(&self, item: T) -> Result<(), T> {
129        match self.sender.try_send(item) {
130            Ok(()) => {
131                self.len.fetch_add(1, Ordering::Relaxed);
132                self.total_enqueued.fetch_add(1, Ordering::Relaxed);
133                Ok(())
134            }
135            Err(TrySendError::Full(item)) => {
136                self.blocked_enqueues.fetch_add(1, Ordering::Relaxed);
137                Err(item)
138            }
139            Err(TrySendError::Disconnected(item)) => Err(item),
140        }
141    }
142
143    /// Blocking enqueue
144    pub fn push(&self, item: T) -> Result<(), T> {
145        match self.sender.send(item) {
146            Ok(()) => {
147                self.len.fetch_add(1, Ordering::Relaxed);
148                self.total_enqueued.fetch_add(1, Ordering::Relaxed);
149                Ok(())
150            }
151            Err(e) => Err(e.0),
152        }
153    }
154
155    /// Try to dequeue an item without blocking
156    pub fn try_pop(&self) -> Option<T> {
157        match self.receiver.try_recv() {
158            Ok(item) => {
159                self.len.fetch_sub(1, Ordering::Relaxed);
160                self.total_dequeued.fetch_add(1, Ordering::Relaxed);
161                Some(item)
162            }
163            Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
164        }
165    }
166
167    /// Blocking dequeue
168    pub fn pop(&self) -> Option<T> {
169        match self.receiver.recv() {
170            Ok(item) => {
171                self.len.fetch_sub(1, Ordering::Relaxed);
172                self.total_dequeued.fetch_add(1, Ordering::Relaxed);
173                Some(item)
174            }
175            Err(_) => None,
176        }
177    }
178
179    /// Try to dequeue up to `max` items
180    pub fn pop_batch(&self, max: usize) -> Vec<T> {
181        let mut batch = Vec::with_capacity(max.min(64));
182        for _ in 0..max {
183            match self.receiver.try_recv() {
184                Ok(item) => {
185                    batch.push(item);
186                }
187                Err(_) => break,
188            }
189        }
190        let count = batch.len();
191        if count > 0 {
192            self.len.fetch_sub(count, Ordering::Relaxed);
193            self.total_dequeued
194                .fetch_add(count as u64, Ordering::Relaxed);
195        }
196        batch
197    }
198
199    /// Get approximate queue length
200    pub fn len(&self) -> usize {
201        self.len.load(Ordering::Relaxed)
202    }
203
204    /// Check if queue is empty
205    pub fn is_empty(&self) -> bool {
206        self.len() == 0
207    }
208
209    /// Get queue capacity
210    pub fn capacity(&self) -> usize {
211        self.capacity
212    }
213
214    /// Get fill percentage (0.0 - 1.0)
215    pub fn fill_ratio(&self) -> f64 {
216        if self.capacity == usize::MAX {
217            0.0
218        } else {
219            self.len() as f64 / self.capacity as f64
220        }
221    }
222
223    /// Get statistics
224    pub fn stats(&self) -> QueueStats {
225        QueueStats {
226            len: self.len(),
227            capacity: self.capacity,
228            total_enqueued: self.total_enqueued.load(Ordering::Relaxed),
229            total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
230            blocked_enqueues: self.blocked_enqueues.load(Ordering::Relaxed),
231        }
232    }
233}
234
235/// Queue statistics
236#[derive(Debug, Clone)]
237pub struct QueueStats {
238    pub len: usize,
239    pub capacity: usize,
240    pub total_enqueued: u64,
241    pub total_dequeued: u64,
242    pub blocked_enqueues: u64,
243}
244
245// ============================================================================
246// Lock-Free Append-Only Log
247// ============================================================================
248
249/// Configuration for the append-only log
250#[derive(Debug, Clone)]
251pub struct AppendLogConfig {
252    /// Size of each segment in bytes
253    pub segment_size: usize,
254    /// Maximum number of segments to keep in memory
255    pub max_segments: usize,
256    /// Whether to preallocate segments
257    pub preallocate: bool,
258}
259
260impl Default for AppendLogConfig {
261    fn default() -> Self {
262        Self {
263            segment_size: 64 * 1024 * 1024, // 64 MB
264            max_segments: 4,
265            preallocate: true,
266        }
267    }
268}
269
270/// A segment in the append-only log
271struct LogSegment {
272    /// Segment data — UnsafeCell for interior mutability (lock-free writes via CAS)
273    data: UnsafeCell<Vec<u8>>,
274    /// Current write position
275    write_pos: AtomicUsize,
276    /// Segment capacity (cached to avoid accessing data through UnsafeCell)
277    capacity: usize,
278    /// Segment base offset
279    base_offset: u64,
280    /// Is this segment sealed (no more writes)?
281    sealed: AtomicBool,
282}
283
284// SAFETY: All accesses to `data` are coordinated through atomic `write_pos`.
285// The CAS on write_pos guarantees exclusive write access to the reserved range.
286// Reads are bounded by write_pos (Acquire ordering) to prevent torn reads.
287unsafe impl Send for LogSegment {}
288unsafe impl Sync for LogSegment {}
289
290impl LogSegment {
291    fn new(base_offset: u64, capacity: usize, preallocate: bool) -> Self {
292        let data = if preallocate {
293            vec![0u8; capacity]
294        } else {
295            let v = vec![0; capacity];
296            v
297        };
298
299        Self {
300            capacity,
301            data: UnsafeCell::new(data),
302            write_pos: AtomicUsize::new(0),
303            base_offset,
304            sealed: AtomicBool::new(false),
305        }
306    }
307
308    /// Try to append data to this segment
309    /// Returns (position, entry_offset) on success
310    fn try_append(&self, data: &[u8]) -> Option<(usize, u64)> {
311        if self.sealed.load(Ordering::Acquire) {
312            return None;
313        }
314
315        let needed = 4 + data.len(); // 4 bytes for length prefix
316
317        // CAS loop to reserve space
318        loop {
319            let current_pos = self.write_pos.load(Ordering::Acquire);
320            let new_pos = current_pos + needed;
321
322            if new_pos > self.capacity {
323                // Segment is full, seal it
324                self.sealed.store(true, Ordering::Release);
325                return None;
326            }
327
328            // Try to reserve space
329            match self.write_pos.compare_exchange_weak(
330                current_pos,
331                new_pos,
332                Ordering::AcqRel,
333                Ordering::Acquire,
334            ) {
335                Ok(_) => {
336                    // Space reserved, write data
337                    // SAFETY: The CAS above guarantees exclusive access to
338                    // [current_pos..new_pos]. The UnsafeCell provides interior
339                    // mutability. No other writer can touch this range.
340                    unsafe {
341                        let buf = &mut *self.data.get();
342                        let ptr = buf.as_mut_ptr();
343                        // Write length prefix (big-endian)
344                        let len = data.len() as u32;
345                        let len_bytes = len.to_be_bytes();
346                        std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), ptr.add(current_pos), 4);
347
348                        // Write data
349                        std::ptr::copy_nonoverlapping(
350                            data.as_ptr(),
351                            ptr.add(current_pos + 4),
352                            data.len(),
353                        );
354                    }
355
356                    let offset = self.base_offset + current_pos as u64;
357                    return Some((current_pos, offset));
358                }
359                Err(_) => {
360                    // CAS failed, retry
361                    std::hint::spin_loop();
362                }
363            }
364        }
365    }
366
367    /// Read an entry at the given position
368    fn read(&self, position: usize) -> Option<&[u8]> {
369        let committed = self.write_pos.load(Ordering::Acquire);
370
371        if position + 4 > committed {
372            return None;
373        }
374
375        // SAFETY: position..position+4 is within committed range,
376        // and all data up to committed has been fully written.
377        let buf = unsafe { &*self.data.get() };
378
379        // Read length prefix
380        let len_bytes: [u8; 4] = buf[position..position + 4].try_into().ok()?;
381        let len = u32::from_be_bytes(len_bytes) as usize;
382
383        if position + 4 + len > committed {
384            return None;
385        }
386
387        Some(&buf[position + 4..position + 4 + len])
388    }
389
390    /// Get committed size
391    fn committed_size(&self) -> usize {
392        self.write_pos.load(Ordering::Acquire)
393    }
394
395    /// Check if segment is sealed
396    fn is_sealed(&self) -> bool {
397        self.sealed.load(Ordering::Acquire)
398    }
399}
400
401/// A lock-free append-only log for high-throughput message storage
402///
403/// Design goals:
404/// - Lock-free appends from multiple producers
405/// - Sequential reads optimized for batching
406/// - Memory-efficient with segment rotation
407pub struct AppendOnlyLog {
408    /// Configuration
409    config: AppendLogConfig,
410    /// Active segments
411    segments: RwLock<Vec<Arc<LogSegment>>>,
412    /// Total bytes written
413    total_bytes: AtomicU64,
414    /// Total entries written (also serves as global offset counter)
415    total_entries: AtomicU64,
416}
417
418impl AppendOnlyLog {
419    /// Create a new append-only log
420    pub fn new(config: AppendLogConfig) -> Self {
421        let initial_segment = Arc::new(LogSegment::new(0, config.segment_size, config.preallocate));
422
423        Self {
424            config,
425            segments: RwLock::new(vec![initial_segment]),
426            total_bytes: AtomicU64::new(0),
427            total_entries: AtomicU64::new(0),
428        }
429    }
430
431    /// Append data to the log, returns the offset
432    pub fn append(&self, data: &[u8]) -> u64 {
433        loop {
434            // Try to append to current segment
435            {
436                let segments = self.segments.read();
437                if let Some(segment) = segments.last() {
438                    if let Some((_, offset)) = segment.try_append(data) {
439                        self.total_bytes
440                            .fetch_add(data.len() as u64, Ordering::Relaxed);
441                        self.total_entries.fetch_add(1, Ordering::Relaxed);
442                        return offset;
443                    }
444                }
445            }
446
447            // Segment is full, need to create a new one
448            self.rotate_segment();
449        }
450    }
451
452    /// Append a batch of entries, returns vec of offsets
453    pub fn append_batch(&self, entries: &[&[u8]]) -> Vec<u64> {
454        let mut offsets = Vec::with_capacity(entries.len());
455
456        for data in entries {
457            offsets.push(self.append(data));
458        }
459
460        offsets
461    }
462
463    /// Rotate to a new segment
464    fn rotate_segment(&self) {
465        let mut segments = self.segments.write();
466
467        // Double-check the last segment is actually sealed
468        if let Some(last) = segments.last() {
469            if !last.is_sealed() {
470                // Another thread may have already rotated
471                return;
472            }
473        }
474
475        // Calculate next base offset
476        let next_base = segments
477            .last()
478            .map(|s| s.base_offset + s.committed_size() as u64)
479            .unwrap_or(0);
480
481        // Create new segment
482        let new_segment = Arc::new(LogSegment::new(
483            next_base,
484            self.config.segment_size,
485            self.config.preallocate,
486        ));
487
488        segments.push(new_segment);
489
490        // Remove old segments if we have too many
491        while segments.len() > self.config.max_segments {
492            segments.remove(0);
493        }
494    }
495
496    /// Read entries starting from an offset
497    pub fn read(&self, start_offset: u64, max_entries: usize) -> Vec<Bytes> {
498        let segments = self.segments.read();
499        let mut entries = Vec::with_capacity(max_entries);
500
501        // Find the segment containing start_offset
502        for segment in segments.iter() {
503            if segment.base_offset > start_offset {
504                continue;
505            }
506
507            let relative_pos = (start_offset - segment.base_offset) as usize;
508            let mut pos = relative_pos;
509
510            while entries.len() < max_entries {
511                match segment.read(pos) {
512                    Some(data) => {
513                        entries.push(Bytes::copy_from_slice(data));
514                        pos += 4 + data.len(); // Move to next entry
515                    }
516                    None => break,
517                }
518            }
519        }
520
521        entries
522    }
523
524    /// Get total bytes written
525    pub fn total_bytes(&self) -> u64 {
526        self.total_bytes.load(Ordering::Relaxed)
527    }
528
529    /// Get total entries written
530    pub fn total_entries(&self) -> u64 {
531        self.total_entries.load(Ordering::Relaxed)
532    }
533
534    /// Get current end offset
535    pub fn end_offset(&self) -> u64 {
536        let segments = self.segments.read();
537        segments
538            .last()
539            .map(|s| s.base_offset + s.committed_size() as u64)
540            .unwrap_or(0)
541    }
542
543    /// Get number of segments
544    pub fn segment_count(&self) -> usize {
545        self.segments.read().len()
546    }
547}
548
549// ============================================================================
550// Sharded Concurrent HashMap
551// ============================================================================
552
553/// Number of shards (should be power of 2)
554const SHARD_COUNT: usize = 64;
555
556/// A cache-friendly sharded concurrent hash map
557///
558/// Uses multiple shards to reduce contention. Each shard has its own lock,
559/// so operations on different keys in different shards can proceed in parallel.
560pub struct ConcurrentHashMap<K, V> {
561    shards: [CacheAligned<RwLock<HashMap<K, V>>>; SHARD_COUNT],
562    len: AtomicUsize,
563}
564
565impl<K: Hash + Eq + Clone, V: Clone> ConcurrentHashMap<K, V> {
566    /// Create a new concurrent hash map
567    pub fn new() -> Self {
568        // Initialize all shards
569        let shards = std::array::from_fn(|_| CacheAligned(RwLock::new(HashMap::new())));
570
571        Self {
572            shards,
573            len: AtomicUsize::new(0),
574        }
575    }
576
577    /// Get the shard index for a key
578    fn shard_index(&self, key: &K) -> usize {
579        let mut hasher = std::collections::hash_map::DefaultHasher::new();
580        key.hash(&mut hasher);
581        hasher.finish() as usize % SHARD_COUNT
582    }
583
584    /// Insert a key-value pair
585    pub fn insert(&self, key: K, value: V) -> Option<V> {
586        let shard_idx = self.shard_index(&key);
587        let mut shard = self.shards[shard_idx].write();
588
589        let old = shard.insert(key, value);
590        if old.is_none() {
591            self.len.fetch_add(1, Ordering::Relaxed);
592        }
593        old
594    }
595
596    /// Get a value by key
597    pub fn get(&self, key: &K) -> Option<V> {
598        let shard_idx = self.shard_index(key);
599        let shard = self.shards[shard_idx].read();
600        shard.get(key).cloned()
601    }
602
603    /// Check if key exists
604    pub fn contains_key(&self, key: &K) -> bool {
605        let shard_idx = self.shard_index(key);
606        let shard = self.shards[shard_idx].read();
607        shard.contains_key(key)
608    }
609
610    /// Remove a key
611    pub fn remove(&self, key: &K) -> Option<V> {
612        let shard_idx = self.shard_index(key);
613        let mut shard = self.shards[shard_idx].write();
614
615        let removed = shard.remove(key);
616        if removed.is_some() {
617            self.len.fetch_sub(1, Ordering::Relaxed);
618        }
619        removed
620    }
621
622    /// Get approximate length
623    pub fn len(&self) -> usize {
624        self.len.load(Ordering::Relaxed)
625    }
626
627    /// Check if empty
628    pub fn is_empty(&self) -> bool {
629        self.len() == 0
630    }
631
632    /// Apply a function to a value
633    pub fn update<F>(&self, key: &K, f: F) -> Option<V>
634    where
635        F: FnOnce(&mut V),
636    {
637        let shard_idx = self.shard_index(key);
638        let mut shard = self.shards[shard_idx].write();
639
640        if let Some(value) = shard.get_mut(key) {
641            f(value);
642            Some(value.clone())
643        } else {
644            None
645        }
646    }
647
648    /// Get or insert with a default value
649    pub fn get_or_insert(&self, key: K, default: V) -> V {
650        let shard_idx = self.shard_index(&key);
651        let mut shard = self.shards[shard_idx].write();
652
653        if let Some(value) = shard.get(&key) {
654            value.clone()
655        } else {
656            self.len.fetch_add(1, Ordering::Relaxed);
657            shard.insert(key, default.clone());
658            default
659        }
660    }
661
662    /// Get or insert with a closure
663    pub fn get_or_insert_with<F>(&self, key: K, f: F) -> V
664    where
665        F: FnOnce() -> V,
666    {
667        let shard_idx = self.shard_index(&key);
668        let mut shard = self.shards[shard_idx].write();
669
670        if let Some(value) = shard.get(&key) {
671            value.clone()
672        } else {
673            let value = f();
674            self.len.fetch_add(1, Ordering::Relaxed);
675            shard.insert(key, value.clone());
676            value
677        }
678    }
679
680    /// Iterate over all entries (snapshot)
681    pub fn snapshot(&self) -> Vec<(K, V)> {
682        let mut entries = Vec::new();
683
684        for shard in &self.shards {
685            let shard = shard.read();
686            for (k, v) in shard.iter() {
687                entries.push((k.clone(), v.clone()));
688            }
689        }
690
691        entries
692    }
693
694    /// Clear all entries
695    pub fn clear(&self) {
696        for shard in &self.shards {
697            shard.write().clear();
698        }
699        self.len.store(0, Ordering::Relaxed);
700    }
701}
702
703impl<K: Hash + Eq + Clone, V: Clone> Default for ConcurrentHashMap<K, V> {
704    fn default() -> Self {
705        Self::new()
706    }
707}
708
709// ============================================================================
710// Lock-Free Skip List for Offset Indexing
711// ============================================================================
712
713/// Maximum height for skip list nodes
714const MAX_HEIGHT: usize = 32;
715
716/// A skip list node
717struct SkipNode<K, V> {
718    key: K,
719    value: UnsafeCell<V>,
720    /// Forward pointers for each level
721    forward: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
722    /// Number of levels this node participates in (used for probabilistic balancing)
723    #[allow(dead_code)]
724    height: usize,
725}
726
727impl<K, V> SkipNode<K, V> {
728    fn new(key: K, value: V, height: usize) -> *mut Self {
729        let forward = std::array::from_fn(|_| AtomicPtr::new(std::ptr::null_mut()));
730
731        let node = Box::new(Self {
732            key,
733            value: UnsafeCell::new(value),
734            forward,
735            height,
736        });
737
738        Box::into_raw(node)
739    }
740}
741
742/// A concurrent skip list optimized for offset lookups
743///
744/// Provides O(log n) lookups for finding messages by offset, which is
745/// critical for efficient consumer fetching.
746///
747/// # Design: Append-Only
748///
749/// This skip list intentionally does **not** implement `remove()`. It is designed for
750/// append-only offset indexing where:
751///
752/// 1. **Entries are never deleted individually** — the entire skip list is dropped
753///    when the corresponding segment is compacted or deleted
754/// 2. **Memory lifecycle is tied to segment lifetime** — when a segment is removed,
755///    the skip list is dropped via the `Drop` implementation, which properly
756///    deallocates all nodes
757/// 3. **Offset indices are monotonically increasing** — no reuse of keys
758///
759/// This design provides better performance than a skip list with remove support:
760/// - No ABA problem handling required
761/// - Simpler atomic operations (no backlink management)
762/// - Predictable memory layout
763pub struct ConcurrentSkipList<K: Ord + Clone, V: Clone> {
764    /// Sentinel head node
765    head: *mut SkipNode<K, V>,
766    /// Current maximum height
767    max_level: AtomicUsize,
768    /// Number of elements
769    len: AtomicUsize,
770    /// Random state for level generation
771    rand_state: AtomicU64,
772}
773
774// SAFETY: The skip list is thread-safe through atomic operations
775unsafe impl<K: Ord + Clone + Send, V: Clone + Send> Send for ConcurrentSkipList<K, V> {}
776unsafe impl<K: Ord + Clone + Sync, V: Clone + Sync> Sync for ConcurrentSkipList<K, V> {}
777
778impl<K: Ord + Clone + Default, V: Clone + Default> ConcurrentSkipList<K, V> {
779    /// Create a new concurrent skip list
780    pub fn new() -> Self {
781        // Create head sentinel
782        let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
783
784        // Use runtime entropy for PRNG seed (process ID, time, address)
785        let seed = Self::generate_seed();
786
787        Self {
788            head,
789            max_level: AtomicUsize::new(1),
790            len: AtomicUsize::new(0),
791            rand_state: AtomicU64::new(seed),
792        }
793    }
794
795    /// Generate a random seed using runtime entropy sources
796    fn generate_seed() -> u64 {
797        use std::collections::hash_map::RandomState;
798        use std::hash::{BuildHasher, Hasher};
799
800        // RandomState uses OS entropy (getrandom on Linux, arc4random on macOS)
801        let state = RandomState::new();
802        let mut hasher = state.build_hasher();
803
804        // Mix in additional entropy sources
805        hasher.write_u64(std::process::id().into());
806
807        if let Ok(time) = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
808            hasher.write_u64(time.as_nanos() as u64);
809        }
810
811        // Mix in address of the hasher itself for extra entropy
812        hasher.write_usize(&hasher as *const _ as usize);
813
814        // Ensure non-zero (XORShift requires non-zero seed)
815        hasher.finish().max(1)
816    }
817
818    /// Generate a random level for a new node
819    fn random_level(&self) -> usize {
820        // XORShift random number generation with atomic CAS
821        let mut level = 1;
822
823        let x = self
824            .rand_state
825            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut x| {
826                x ^= x << 13;
827                x ^= x >> 7;
828                x ^= x << 17;
829                Some(x)
830            })
831            .unwrap_or(1);
832
833        let mut bits = x;
834        while bits & 1 == 0 && level < MAX_HEIGHT {
835            level += 1;
836            bits >>= 1;
837        }
838
839        level
840    }
841
842    /// Insert a key-value pair
843    pub fn insert(&self, key: K, value: V) {
844        let height = self.random_level();
845        let new_node = SkipNode::new(key.clone(), value, height);
846
847        // Update max level if needed
848        let mut current_max = self.max_level.load(Ordering::Relaxed);
849        while height > current_max {
850            match self.max_level.compare_exchange_weak(
851                current_max,
852                height,
853                Ordering::AcqRel,
854                Ordering::Relaxed,
855            ) {
856                Ok(_) => break,
857                Err(m) => current_max = m,
858            }
859        }
860
861        // Find position and insert
862        let mut update = [std::ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
863        let mut current = self.head;
864
865        #[allow(clippy::needless_range_loop)]
866        // Intentional: unsafe pointer array requires explicit indexing
867        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
868            // SAFETY: We traverse from head which is always valid. Each `forward` pointer
869            // is either null or points to a valid SkipNode allocated by `insert`.
870            // Acquire ordering ensures we see the complete node data.
871            unsafe {
872                loop {
873                    let next = (*current).forward[i].load(Ordering::Acquire);
874                    if next.is_null() || (*next).key >= key {
875                        break;
876                    }
877                    current = next;
878                }
879                update[i] = current;
880            }
881        }
882
883        // Insert node at each level
884        #[allow(clippy::needless_range_loop)]
885        for i in 0..height {
886            // SAFETY: `pred` is either `self.head` (always valid) or a node from `update`
887            // which was found during traversal. `new_node` was just allocated above.
888            // Release ordering ensures the new node's data is visible before the pointer.
889            unsafe {
890                let pred = if update[i].is_null() {
891                    self.head
892                } else {
893                    update[i]
894                };
895                let next = (*pred).forward[i].load(Ordering::Acquire);
896                (*new_node).forward[i].store(next, Ordering::Release);
897                (*pred).forward[i].store(new_node, Ordering::Release);
898            }
899        }
900
901        self.len.fetch_add(1, Ordering::Relaxed);
902    }
903
904    /// Find a value by key
905    pub fn get(&self, key: &K) -> Option<V> {
906        let mut current = self.head;
907
908        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
909            // SAFETY: Traversal starts from `self.head` which is always valid.
910            // All forward pointers are either null or point to valid SkipNodes.
911            // Acquire ordering ensures we see the node's complete data.
912            unsafe {
913                loop {
914                    let next = (*current).forward[i].load(Ordering::Acquire);
915                    if next.is_null() {
916                        break;
917                    }
918                    if (*next).key == *key {
919                        return Some((*(*next).value.get()).clone());
920                    }
921                    if (*next).key > *key {
922                        break;
923                    }
924                    current = next;
925                }
926            }
927        }
928
929        None
930    }
931
932    /// Find the greatest key less than or equal to the given key
933    /// This is useful for finding the closest offset <= target
934    pub fn floor(&self, key: &K) -> Option<(K, V)> {
935        let mut current = self.head;
936        let mut result: Option<*mut SkipNode<K, V>> = None;
937
938        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
939            // SAFETY: Traversal starts from `self.head` which is always valid.
940            // All forward pointers are either null or point to valid SkipNodes.
941            // Acquire ordering ensures visibility of node data.
942            unsafe {
943                loop {
944                    let next = (*current).forward[i].load(Ordering::Acquire);
945                    if next.is_null() {
946                        break;
947                    }
948                    if (*next).key <= *key {
949                        result = Some(next);
950                        current = next;
951                    } else {
952                        break;
953                    }
954                }
955            }
956        }
957
958        // SAFETY: `node` was obtained from traversal and is a valid SkipNode pointer.
959        result.map(|node| unsafe { ((*node).key.clone(), (*(*node).value.get()).clone()) })
960    }
961
962    /// Find the smallest key greater than or equal to the given key
963    pub fn ceiling(&self, key: &K) -> Option<(K, V)> {
964        let mut current = self.head;
965
966        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
967            // SAFETY: Traversal starts from `self.head` which is always valid.
968            // All forward pointers are either null or point to valid SkipNodes.
969            unsafe {
970                loop {
971                    let next = (*current).forward[i].load(Ordering::Acquire);
972                    if next.is_null() || (*next).key >= *key {
973                        break;
974                    }
975                    current = next;
976                }
977            }
978        }
979
980        // SAFETY: `current` is a valid node from traversal (or head).
981        // The next pointer is either null or points to a valid SkipNode.
982        unsafe {
983            let next = (*current).forward[0].load(Ordering::Acquire);
984            if !next.is_null() {
985                Some(((*next).key.clone(), (*(*next).value.get()).clone()))
986            } else {
987                None
988            }
989        }
990    }
991
992    /// Get number of elements
993    pub fn len(&self) -> usize {
994        self.len.load(Ordering::Relaxed)
995    }
996
997    /// Check if empty
998    pub fn is_empty(&self) -> bool {
999        self.len() == 0
1000    }
1001
1002    /// Get a range of entries
1003    pub fn range(&self, start: &K, end: &K, limit: usize) -> Vec<(K, V)> {
1004        let mut entries = Vec::with_capacity(limit.min(1000));
1005        let mut current = self.head;
1006
1007        // Find start position
1008        for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
1009            // SAFETY: Traversal starts from `self.head` which is always valid.
1010            // All forward pointers are either null or point to valid SkipNodes.
1011            unsafe {
1012                loop {
1013                    let next = (*current).forward[i].load(Ordering::Acquire);
1014                    if next.is_null() || (*next).key >= *start {
1015                        break;
1016                    }
1017                    current = next;
1018                }
1019            }
1020        }
1021
1022        // Collect entries in range
1023        // SAFETY: `current` is valid from traversal. We iterate level-0 forward pointers
1024        // which are either null or point to valid SkipNodes allocated by `insert`.
1025        unsafe {
1026            let mut node = (*current).forward[0].load(Ordering::Acquire);
1027            while !node.is_null() && entries.len() < limit {
1028                if (*node).key > *end {
1029                    break;
1030                }
1031                entries.push(((*node).key.clone(), (*(*node).value.get()).clone()));
1032                node = (*node).forward[0].load(Ordering::Acquire);
1033            }
1034        }
1035
1036        entries
1037    }
1038}
1039
1040impl<K: Ord + Clone + Default, V: Clone + Default> Default for ConcurrentSkipList<K, V> {
1041    fn default() -> Self {
1042        Self::new()
1043    }
1044}
1045
1046impl<K: Ord + Clone, V: Clone> Drop for ConcurrentSkipList<K, V> {
1047    fn drop(&mut self) {
1048        // Free all nodes
1049        let mut current = self.head;
1050        // SAFETY: We have exclusive access via `&mut self`. All nodes were allocated
1051        // by `insert` using `Box::into_raw`. We traverse level-0 to visit every node
1052        // exactly once, converting them back to Box for proper deallocation.
1053        unsafe {
1054            while !current.is_null() {
1055                let next = (*current).forward[0].load(Ordering::Relaxed);
1056                drop(Box::from_raw(current));
1057                current = next;
1058            }
1059        }
1060    }
1061}
1062
1063// ============================================================================
1064// Tests
1065// ============================================================================
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070    use std::sync::Arc;
1071    use std::thread;
1072
1073    #[test]
1074    fn test_lock_free_queue() {
1075        let queue = LockFreeQueue::<i32>::bounded(100);
1076
1077        assert!(queue.is_empty());
1078
1079        queue.push(1).unwrap();
1080        queue.push(2).unwrap();
1081        queue.push(3).unwrap();
1082
1083        assert_eq!(queue.len(), 3);
1084        assert_eq!(queue.pop(), Some(1));
1085        assert_eq!(queue.pop(), Some(2));
1086        assert_eq!(queue.pop(), Some(3));
1087        assert!(queue.is_empty());
1088    }
1089
1090    #[test]
1091    fn test_lock_free_queue_concurrent() {
1092        let queue = Arc::new(LockFreeQueue::<i32>::bounded(1000));
1093        let mut handles = vec![];
1094
1095        // Spawn producers
1096        for i in 0..4 {
1097            let q = queue.clone();
1098            handles.push(thread::spawn(move || {
1099                for j in 0..250 {
1100                    q.push(i * 250 + j).unwrap();
1101                }
1102            }));
1103        }
1104
1105        // Wait for producers
1106        for h in handles {
1107            h.join().unwrap();
1108        }
1109
1110        assert_eq!(queue.len(), 1000);
1111
1112        // Consume all
1113        let batch = queue.pop_batch(1000);
1114        assert_eq!(batch.len(), 1000);
1115    }
1116
1117    #[test]
1118    fn test_append_only_log() {
1119        let config = AppendLogConfig {
1120            segment_size: 1024,
1121            max_segments: 4,
1122            preallocate: true,
1123        };
1124        let log = AppendOnlyLog::new(config);
1125
1126        let offset1 = log.append(b"hello");
1127        let offset2 = log.append(b"world");
1128
1129        assert!(offset2 > offset1);
1130
1131        let entries = log.read(offset1, 10);
1132        assert_eq!(entries.len(), 2);
1133        assert_eq!(&entries[0][..], b"hello");
1134        assert_eq!(&entries[1][..], b"world");
1135    }
1136
1137    #[test]
1138    fn test_concurrent_hash_map() {
1139        let map = Arc::new(ConcurrentHashMap::<String, i32>::new());
1140        let mut handles = vec![];
1141
1142        // Concurrent inserts
1143        for i in 0..4 {
1144            let m = map.clone();
1145            handles.push(thread::spawn(move || {
1146                for j in 0..250 {
1147                    m.insert(format!("key-{}-{}", i, j), i * 250 + j);
1148                }
1149            }));
1150        }
1151
1152        for h in handles {
1153            h.join().unwrap();
1154        }
1155
1156        assert_eq!(map.len(), 1000);
1157
1158        // Verify reads
1159        assert_eq!(map.get(&"key-0-0".to_string()), Some(0));
1160        assert_eq!(map.get(&"key-3-249".to_string()), Some(999));
1161    }
1162
1163    #[test]
1164    fn test_skip_list() {
1165        let list = ConcurrentSkipList::<u64, String>::new();
1166
1167        list.insert(10, "ten".to_string());
1168        list.insert(20, "twenty".to_string());
1169        list.insert(5, "five".to_string());
1170        list.insert(15, "fifteen".to_string());
1171
1172        assert_eq!(list.len(), 4);
1173        assert_eq!(list.get(&10), Some("ten".to_string()));
1174        assert_eq!(list.get(&99), None);
1175
1176        // Floor test
1177        assert_eq!(list.floor(&12), Some((10, "ten".to_string())));
1178        assert_eq!(list.floor(&15), Some((15, "fifteen".to_string())));
1179
1180        // Ceiling test
1181        assert_eq!(list.ceiling(&12), Some((15, "fifteen".to_string())));
1182        assert_eq!(list.ceiling(&1), Some((5, "five".to_string())));
1183    }
1184
1185    #[test]
1186    fn test_skip_list_range() {
1187        let list = ConcurrentSkipList::<u64, String>::new();
1188
1189        for i in 0..100 {
1190            list.insert(i * 10, format!("value-{}", i));
1191        }
1192
1193        let range = list.range(&150, &350, 100);
1194        assert!(!range.is_empty());
1195
1196        // Should include 150, 160, ..., 350
1197        for (k, _) in &range {
1198            assert!(*k >= 150 && *k <= 350);
1199        }
1200    }
1201}