fluxmap/
lib.rs

1#![doc = include_str!("../README.md")]
2//! The core, concurrent, multi-version skiplist implementation.
3//!
4//! This module provides `SkipList`, a highly concurrent data structure that serves
5//! as the foundation for FluxMap. It uses Multi-Version Concurrency Control (MVCC)
6//! to allow for non-blocking reads and high-performance writes.
7//!
8//! # Internals
9//!
10//! -   **Nodes:** The skiplist is composed of `Node`s, each representing a key.
11//! -   **Version Chains:** Each `Node` points to a linked list of `VersionNode`s.
12//!     Each `VersionNode` represents a specific version of the value for that key,
13//!     created by a specific transaction.
14//! -   **MVCC:** When a value is updated, a new `VersionNode` is prepended to the
15//!     chain. When a value is deleted, the most recent `VersionNode` is marked as
16
17use std::borrow::Borrow;
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
21};
22
23use crossbeam_epoch::{Atomic, Guard, Owned, Shared};
24use crossbeam_utils::CachePadded;
25use dashmap::DashSet; // Added for read_trackers
26use futures::stream::Stream;
27use serde::{Serialize, de::DeserializeOwned};
28
29pub mod db;
30pub mod error;
31pub mod persistence;
32pub mod transaction;
33pub mod vacuum;
34pub use crate::transaction::{Snapshot, Transaction, TransactionManager, TxId, Version};
35pub use persistence::{DurabilityLevel, PersistenceEngine, PersistenceOptions};
36
37const DEFAULT_MAX_LEVEL: usize = 32;
38const DEFAULT_P: f64 = 0.5;
39
40/// A node in the version chain for a single key.
41struct VersionNode<V> {
42    version: Version<V>,
43    next: Atomic<VersionNode<V>>,
44}
45
46impl<V> VersionNode<V> {
47    fn new(version: Version<V>) -> Owned<Self> {
48        Owned::new(Self {
49            version,
50            next: Atomic::null(),
51        })
52    }
53}
54
55/// A node in the skiplist, representing a key and its chain of versions.
56struct Node<K, V> {
57    key: Option<K>,
58    /// An atomically-managed pointer to the head of the version chain.
59    value: Atomic<VersionNode<Arc<V>>>,
60    /// The forward pointers for each level of the skiplist.
61    next: Vec<Atomic<Node<K, V>>>,
62    /// A flag indicating that this node is logically deleted and awaiting physical removal.
63    deleted: AtomicBool,
64}
65
66impl<K, V> Node<K, V> {
67    /// Creates a new head node for a skiplist.
68    fn head(max_level: usize) -> Owned<Self> {
69        Owned::new(Node {
70            key: None,
71            value: Atomic::null(),
72            next: (0..max_level).map(|_| Atomic::null()).collect(),
73            deleted: AtomicBool::new(false),
74        })
75    }
76
77    /// Creates a new data node with a single version.
78    fn new(key: K, value: Arc<V>, level: usize, txid: TxId) -> Owned<Self> {
79        let version = Version {
80            value,
81            creator_txid: txid,
82            expirer_txid: AtomicU64::new(0), // 0 means not expired.
83        };
84        let version_node = VersionNode::new(version);
85
86        Owned::new(Node {
87            key: Some(key),
88            value: Atomic::from(version_node),
89            next: (0..level + 1).map(|_| Atomic::null()).collect(),
90            deleted: AtomicBool::new(false),
91        })
92    }
93}
94
95/// A concurrent, multi-version, transactional skiplist.
96///
97/// `SkipList` is the core data structure that stores key-value pairs. It supports
98/// highly concurrent reads and writes using Multi-Version Concurrency Control (MVCC)
99/// and Serializable Snapshot Isolation (SSI).
100pub struct SkipList<K: Eq + std::hash::Hash, V> {
101    head: CachePadded<Atomic<Node<K, V>>>,
102    max_level: CachePadded<usize>,
103    level: CachePadded<AtomicUsize>,
104    len: CachePadded<AtomicUsize>,
105    p: CachePadded<f64>,
106    tx_manager: Arc<TransactionManager<K, V>>,
107}
108
109enum InsertAction {
110    YieldAndRetry,
111    Return,
112}
113
114impl<K, V> Default for SkipList<K, V>
115where
116    K: Ord + Clone + Send + Sync + 'static + std::hash::Hash + Eq + Serialize + DeserializeOwned,
117    V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
118{
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl<K, V> SkipList<K, V>
125where
126    K: Ord + Clone + Send + Sync + 'static + std::hash::Hash + Eq + Serialize + DeserializeOwned,
127    V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
128{
129    /// Creates a new, empty `SkipList` with the default max level.
130    pub fn new() -> Self {
131        Self::with_max_level(DEFAULT_MAX_LEVEL)
132    }
133
134    /// Creates a new, empty `SkipList` with a specified max level.
135    pub fn with_max_level(max_level: usize) -> Self {
136        Self::with_max_level_and_p(max_level, DEFAULT_P)
137    }
138
139    /// Creates a new, empty `SkipList` with a specified max level and probability factor.
140    pub fn with_max_level_and_p(max_level: usize, p: f64) -> Self {
141        let head = Node::head(max_level);
142        SkipList {
143            head: CachePadded::new(Atomic::from(head)),
144            max_level: CachePadded::new(max_level),
145            level: CachePadded::new(AtomicUsize::new(0)),
146            len: CachePadded::new(AtomicUsize::new(0)),
147            p: CachePadded::new(p),
148            tx_manager: Arc::new(TransactionManager::<K, V>::new()),
149        }
150    }
151
152    /// Returns a reference to the associated `TransactionManager`.
153    pub fn transaction_manager(&self) -> &Arc<TransactionManager<K, V>> {
154        &self.tx_manager
155    }
156
157    /// Returns the approximate number of keys in the skiplist.
158    ///
159    /// This is an approximation because it may not reflect in-flight additions or removals.
160    pub fn len(&self) -> usize {
161        self.len.load(Ordering::Relaxed)
162    }
163
164    /// Returns `true` if the skiplist contains no keys.
165    pub fn is_empty(&self) -> bool {
166        self.len() == 0
167    }
168
169    /// Generates a random level for a new node based on the probability factor `p`.
170    fn random_level(&self) -> usize {
171        let mut level = 0;
172        while fastrand::f64() < *self.p && level < *self.max_level - 1 {
173            level += 1;
174        }
175        level
176    }
177
178    /// Finds the predecessor node for a given key at a specific level.
179    /// This function also helps with physical removal of logically deleted nodes it encounters.
180    fn find_predecessor_at_level<'guard, Q: ?Sized>(
181        &self,
182        key: &Q,
183        mut current: Shared<'guard, Node<K, V>>,
184        level: usize,
185        guard: &'guard Guard,
186    ) -> Shared<'guard, Node<K, V>>
187    where
188        K: Borrow<Q>,
189        Q: Ord,
190    {
191        loop {
192            let next = unsafe {
193                // SAFETY: `current` is a `Shared` pointer obtained from `Atomic::from` or `load`
194                // operations, ensuring it's a valid, non-null pointer to a `Node<K, V>`.
195                // The `guard` ensures that the memory pointed to by `current` is protected
196                // from reclamation during this operation.
197                current.deref().next[level].load(Ordering::Relaxed, guard)
198            };
199
200            if let Some(next_node) = unsafe {
201                // SAFETY: `next` is a `Shared` pointer. `as_ref()` is safe as `next` is checked for null.
202                // The `guard` ensures the memory is protected.
203                next.as_ref()
204            } {
205                if next_node.deleted.load(Ordering::Acquire) {
206                    // Node is logically deleted, help physically remove it.
207                    let next_of_next = next_node.next[level].load(Ordering::Relaxed, guard);
208                    if unsafe {
209                        // SAFETY: `current` is a valid pointer as established above.
210                        // The `compare_exchange` operation is atomic and ensures memory safety.
211                        // We are attempting to swing the `next` pointer of `current` to skip over
212                        // the logically deleted `next` node.
213                        current.deref().next[level].compare_exchange(
214                            next,
215                            next_of_next,
216                            Ordering::AcqRel,
217                            Ordering::Relaxed,
218                            guard,
219                        )
220                    }
221                    .is_ok()
222                    {
223                        // Physical removal was successful.
224                        // Only decrement len and schedule for destruction at the base level
225                        // to ensure it happens exactly once per node.
226                        if level == 0 {
227                            self.len.fetch_sub(1, Ordering::Relaxed);
228                            unsafe {
229                                // SAFETY: `next` points to the unlinked node. Since we have successfully
230                                // unlinked it, no other thread will be able to reach it through the skiplist.
231                                // We can now safely schedule its memory to be reclaimed by the epoch-based
232                                // garbage collector.
233                                guard.defer_destroy(next);
234                            }
235                        }
236                    }
237                    // Retry finding the predecessor from `current`, as the list has changed.
238                    continue;
239                }
240
241                // If there is a next node and its key is less than the target key, move forward.
242                // SAFETY: `next_node` is a valid reference. `key` is `Some` for all non-head nodes.
243                // `unwrap_unchecked` is safe here because we know `next_node` is not the head node,
244                // which is the only node type where `key` is `None`.
245                if <K as Borrow<Q>>::borrow(unsafe { next_node.key.as_ref().unwrap_unchecked() })
246                    < key
247                {
248                    current = next;
249                    continue;
250                }
251            }
252
253            // Otherwise, `current` is the predecessor at this level.
254            break;
255        }
256        current
257    }
258
259    /// Finds the predecessor node for a given key by searching from the top level down.
260    fn find_optimistic_predecessor<'guard, Q: ?Sized>(
261        &self,
262        key: &Q,
263        guard: &'guard Guard,
264    ) -> Shared<'guard, Node<K, V>>
265    where
266        K: Borrow<Q>,
267        Q: Ord,
268    {
269        let head = self.head.load(Ordering::Relaxed, guard);
270        let mut predecessor = head;
271        for i in (0..*self.max_level).rev() {
272            predecessor = self.find_predecessor_at_level(key, predecessor, i, guard);
273        }
274        predecessor
275    }
276
277    /// Finds all predecessor nodes for a given key, one for each level of the skiplist.
278    fn find_predecessors<'guard, Q: ?Sized>(
279        &self,
280        key: &Q,
281        guard: &'guard Guard,
282    ) -> Vec<Shared<'guard, Node<K, V>>>
283    where
284        K: Borrow<Q>,
285        Q: Ord,
286    {
287        let head = self.head.load(Ordering::Relaxed, guard);
288        let mut predecessors = vec![Shared::null(); *self.max_level];
289        let mut current = head;
290
291        for i in (0..*self.max_level).rev() {
292            current = self.find_predecessor_at_level(key, current, i, guard);
293            predecessors[i] = current;
294        }
295        predecessors
296    }
297
298    /// Retrieves the value associated with `key` that is visible to the given `transaction`.
299    ///
300    /// This method traverses the skiplist to find the node for the given `key`. It then
301    /// walks the version chain for that node to find the most recent version that is
302    /// visible according to the transaction's `Snapshot`.
303    ///
304    /// As part of the SSI protocol, this operation adds the key to the transaction's
305    /// read set.
306    pub fn get(&self, key: &K, transaction: &Transaction<K, V>) -> Option<Arc<V>> {
307        let snapshot = &transaction.snapshot;
308        let guard = &crossbeam_epoch::pin();
309        let predecessor = self.find_optimistic_predecessor::<K>(key, guard);
310        let current = unsafe {
311            // SAFETY: `predecessor` is a `Shared` pointer to a valid `Node`. `deref()` is safe
312            // because `find_optimistic_predecessor` ensures it's not null. The `guard` protects the memory.
313            predecessor.deref().next[0].load(Ordering::Acquire, guard)
314        };
315
316        if let Some(node) = unsafe {
317            // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe as `current` is checked for null.
318            // The `guard` ensures the memory is protected.
319            current.as_ref()
320        } {
321            if unsafe {
322                // SAFETY: `node` is a valid reference. `key` is `Some` for all non-head nodes.
323                // `unwrap_unchecked` is safe as we've confirmed `node` is not the head.
324                node.key.as_ref().unwrap_unchecked()
325            } == key
326                && !node.deleted.load(Ordering::Acquire)
327            {
328                let mut current_version_ptr = node.value.load(Ordering::Acquire, guard);
329                while let Some(version_node) = unsafe {
330                    // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe as
331                    // it's checked for null. The `guard` ensures the memory is protected.
332                    current_version_ptr.as_ref()
333                } {
334                    let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
335
336                    if is_visible {
337                        // Record the read for SSI conflict detection.
338                        transaction
339                            .read_set
340                            .insert(key.clone(), version_node.version.creator_txid);
341                        // Add this transaction to the read_trackers for this key
342                        self.tx_manager
343                            .read_trackers
344                            .entry(key.clone())
345                            .or_insert_with(DashSet::new)
346                            .insert(transaction.id);
347                        return Some(version_node.version.value.clone());
348                    }
349
350                    current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
351                }
352            }
353        }
354        None
355    }
356
357    /// Checks if a key exists and is visible to the given `transaction`.
358    pub fn contains_key(&self, key: &K, transaction: &Transaction<K, V>) -> bool {
359        self.get(key, transaction).is_some()
360    }
361
362    /// Links a new node into the skiplist at all its levels.
363    fn link_new_node<'guard>(
364        &self,
365        key: &K,
366        mut predecessors: Vec<Shared<'guard, Node<K, V>>>,
367        new_node_shared: Shared<'guard, Node<K, V>>,
368        new_level: usize,
369        guard: &'guard Guard,
370    ) {
371        // Link the node from level 1 up to its randomly determined level.
372        // Level 0 is handled separately by the caller.
373        for i in 1..=new_level {
374            loop {
375                let pred = predecessors[i];
376                let next_at_level = unsafe {
377                    // SAFETY: `pred` is a `Shared` pointer to a valid `Node`. `deref()` is safe
378                    // because `find_predecessors` ensures it's valid. The `guard` protects the memory.
379                    pred.deref().next[i].load(Ordering::Relaxed, guard)
380                };
381                unsafe {
382                    // SAFETY: `new_node_shared` is a `Shared` pointer to a valid `Node`.
383                    // `deref()` is safe. We are setting its forward pointer.
384                    new_node_shared.deref().next[i].store(next_at_level, Ordering::Relaxed)
385                };
386
387                if unsafe {
388                    // SAFETY: `pred` is a valid pointer. The `compare_exchange` is atomic and
389                    // safely links the new node into the list at this level.
390                    pred.deref().next[i].compare_exchange(
391                        next_at_level,
392                        new_node_shared,
393                        Ordering::AcqRel,
394                        Ordering::Acquire,
395                        guard,
396                    )
397                }
398                .is_ok()
399                {
400                    break; // Success, move to the next level.
401                }
402                // CAS failed, contention. Re-find predecessors and retry for this level.
403                predecessors = self.find_predecessors::<K>(key, guard);
404            }
405        }
406
407        self.len.fetch_add(1, Ordering::Relaxed);
408        self.level.fetch_max(new_level, Ordering::Release);
409    }
410
411    /// Inserts a key-value pair as part of a transaction.
412    ///
413    /// If the key already exists, this prepends a new version to its version chain.
414    /// If the key does not exist, this creates a new `Node` and links it into the skiplist.
415    ///
416    /// This operation adds the key to the transaction's write set for SSI conflict detection.
417    pub async fn insert(&self, key: K, value: Arc<V>, transaction: &Transaction<K, V>) {
418        transaction.write_set.insert(key.clone());
419        let new_level = self.random_level();
420
421        loop {
422            let action = {
423                let guard = &crossbeam_epoch::pin();
424                let predecessors = self.find_predecessors::<K>(&key, guard);
425                let predecessor = predecessors[0];
426
427                let next = unsafe {
428                    // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
429                    // The `guard` protects the memory.
430                    predecessor.deref().next[0].load(Ordering::Relaxed, guard)
431                };
432
433                if let Some(next_node) = unsafe {
434                    // SAFETY: `next` is a `Shared` pointer. `as_ref()` is safe as `next` is checked for null.
435                    // The `guard` ensures the memory is protected.
436                    next.as_ref()
437                } {
438                    if unsafe {
439                        // SAFETY: `next_node` is a valid reference. `key` is `Some` for all non-head nodes.
440                        next_node.key.as_ref().unwrap_unchecked()
441                    } == &key
442                    {
443                        // Key exists. Prepend a new version to the version chain.
444                        let new_version = Version {
445                            value: value.clone(),
446                            creator_txid: transaction.id,
447                            expirer_txid: AtomicU64::new(0),
448                        };
449                        let new_version_node = VersionNode::new(new_version);
450                        let new_version_node_shared = new_version_node.into_shared(guard);
451
452                        loop {
453                            let current_head_ptr = next_node.value.load(Ordering::Acquire, guard);
454                            unsafe {
455                                // SAFETY: `new_version_node_shared` is a valid `Shared` pointer.
456                                // `deref()` is safe. We are setting its `next` pointer to the current
457                                // head of the version chain.
458                                new_version_node_shared
459                                    .deref()
460                                    .next
461                                    .store(current_head_ptr, Ordering::Relaxed)
462                            };
463
464                            // Atomically swing the `value` pointer to the new version node.
465                            match next_node.value.compare_exchange(
466                                current_head_ptr,
467                                new_version_node_shared,
468                                Ordering::AcqRel,
469                                Ordering::Acquire,
470                                guard,
471                            ) {
472                                Ok(_) => break InsertAction::Return, // Success
473                                Err(_) => continue,                  // Contention, retry CAS loop
474                            }
475                        }
476                    } else {
477                        // Key does not exist, create a new node.
478                        let new_node =
479                            Node::new(key.clone(), value.clone(), new_level, transaction.id);
480                        let new_node_shared = new_node.into_shared(guard);
481
482                        unsafe {
483                            // SAFETY: `new_node_shared` is a valid `Shared` pointer. `deref()` is safe.
484                            // We are setting its level 0 forward pointer.
485                            new_node_shared.deref().next[0].store(next, Ordering::Relaxed)
486                        };
487
488                        if unsafe {
489                            // SAFETY: `predecessor` is a valid pointer. The `compare_exchange` is atomic
490                            // and safely links the new node into the base level of the list.
491                            predecessor.deref().next[0].compare_exchange(
492                                next,
493                                new_node_shared,
494                                Ordering::AcqRel,
495                                Ordering::Acquire,
496                                guard,
497                            )
498                        }
499                        .is_err()
500                        {
501                            InsertAction::YieldAndRetry // Contention, retry whole operation.
502                        } else {
503                            // Link the node at higher levels.
504                            self.link_new_node(
505                                &key,
506                                predecessors,
507                                new_node_shared,
508                                new_level,
509                                guard,
510                            );
511                            InsertAction::Return
512                        }
513                    }
514                } else {
515                    // List is empty or we are at the end. Create a new node.
516                    let new_node = Node::new(key.clone(), value.clone(), new_level, transaction.id);
517                    let new_node_shared = new_node.into_shared(guard);
518
519                    unsafe {
520                        // SAFETY: `new_node_shared` is a valid `Shared` pointer. `deref()` is safe.
521                        new_node_shared.deref().next[0].store(next, Ordering::Relaxed)
522                    };
523
524                    if unsafe {
525                        // SAFETY: `predecessor` is a valid pointer. The `compare_exchange` is atomic.
526                        predecessor.deref().next[0].compare_exchange(
527                            next,
528                            new_node_shared,
529                            Ordering::AcqRel,
530                            Ordering::Acquire,
531                            guard,
532                        )
533                    }
534                    .is_err()
535                    {
536                        InsertAction::YieldAndRetry
537                    } else {
538                        self.link_new_node(&key, predecessors, new_node_shared, new_level, guard);
539                        InsertAction::Return
540                    }
541                }
542            };
543
544            match action {
545                InsertAction::YieldAndRetry => {
546                    // Implement exponential backoff
547                    let mut attempts = 0;
548                    loop {
549                        attempts += 1;
550                        if attempts < 5 {
551                            // Spin for a few attempts
552                            std::thread::yield_now();
553                        } else {
554                            // Then yield to Tokio runtime with increasing delay
555                            let delay_ms = 2u64.pow(attempts - 5); // Exponential delay
556                            tokio::time::sleep(std::time::Duration::from_millis(delay_ms.min(100)))
557                                .await; // Cap delay at 100ms
558                        }
559                        // Break from the inner loop to let the outer loop re-run the whole insert logic.
560                        break;
561                    }
562                    continue; // Continue the outer loop to retry the insert operation
563                }
564                InsertAction::Return => return,
565            }
566        }
567    }
568
569    /// Logically removes a key as part of a transaction.
570    ///
571    /// This finds the latest visible version of the key and atomically sets its
572    /// `expirer_txid` to the current transaction's ID. The actual data is not
573    /// removed until the vacuum process runs.
574    ///
575    /// This operation adds the key to the transaction's write set.
576    ///
577    /// # Returns
578    ///
579    /// Returns the value that was removed if a visible version was found, otherwise `None`.
580    pub async fn remove(&self, key: &K, transaction: &Transaction<K, V>) -> Option<Arc<V>> {
581        transaction.write_set.insert(key.clone());
582        let transaction_id = transaction.id;
583
584        let guard = &crossbeam_epoch::pin();
585        let predecessor = self.find_optimistic_predecessor::<K>(key, guard);
586        let node_ptr = unsafe {
587            // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
588            // The `guard` protects the memory.
589            predecessor.deref().next[0].load(Ordering::Acquire, guard)
590        };
591
592        if let Some(node) = unsafe {
593            // SAFETY: `node_ptr` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
594            // The `guard` protects the memory.
595            node_ptr.as_ref()
596        } {
597            if unsafe {
598                // SAFETY: `node` is a valid reference. `key` is `Some` for all non-head nodes.
599                node.key.as_ref().unwrap_unchecked()
600            } != key
601            {
602                return None; // Key not found.
603            }
604
605            // If the node is already marked as deleted by the vacuum, we can't do anything.
606            if node.deleted.load(Ordering::Acquire) {
607                return None;
608            }
609
610            let mut version_ptr = node.value.load(Ordering::Acquire, guard);
611            while let Some(version_node) = unsafe {
612                // SAFETY: `version_ptr` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
613                // The `guard` protects the memory.
614                version_ptr.as_ref()
615            } {
616                // Check if the version is visible to the current transaction.
617                let is_visible = transaction
618                    .snapshot
619                    .is_visible(&version_node.version, &*self.tx_manager);
620
621                if is_visible {
622                    // This is a version we can try to expire.
623                    // Atomically set the expirer_txid from 0 to our transaction ID.
624                    match version_node.version.expirer_txid.compare_exchange(
625                        0,
626                        transaction_id,
627                        Ordering::AcqRel,
628                        Ordering::Acquire,
629                    ) {
630                        Ok(_) => {
631                            // Success! We expired this version.
632                            return Some(version_node.version.value.clone());
633                        }
634                        Err(_) => {
635                            // CAS failed. Another concurrent transaction just expired it.
636                            // This version is no longer visible to us.
637                            // We continue the loop to find the next visible version.
638                        }
639                    }
640                }
641
642                // Move to the next version in the chain.
643                version_ptr = version_node.next.load(Ordering::Acquire, guard);
644            }
645
646            // If we reach here, no visible version was found or we lost all races.
647            return None;
648        } else {
649            return None; // Node not found.
650        }
651    }
652
653    /// Scans a range of keys and returns the visible versions as a `Vec`.
654    pub fn range(&self, start: &K, end: &K, snapshot: &Snapshot) -> Vec<(K, Arc<V>)> {
655        let guard = &crossbeam_epoch::pin();
656        let mut results = Vec::new();
657
658        let predecessor = self.find_optimistic_predecessor::<K>(start, guard);
659        let mut current = unsafe {
660            // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
661            // The `guard` protects the memory.
662            predecessor.deref().next[0].load(Ordering::Acquire, guard)
663        };
664
665        loop {
666            if let Some(node_ref) = unsafe {
667                // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
668                // The `guard` protects the memory.
669                current.as_ref()
670            } {
671                if node_ref.deleted.load(Ordering::Acquire) {
672                    current = node_ref.next[0].load(Ordering::Acquire, guard);
673                    continue;
674                }
675                let key = unsafe {
676                    // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
677                    node_ref.key.as_ref().unwrap_unchecked()
678                };
679                if key > end {
680                    break;
681                }
682                if key >= start {
683                    let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
684                    while let Some(version_node) = unsafe {
685                        // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
686                        // The `guard` protects the memory.
687                        current_version_ptr.as_ref()
688                    } {
689                        let is_visible =
690                            snapshot.is_visible(&version_node.version, &*self.tx_manager);
691
692                        if is_visible {
693                            results.push((key.clone(), version_node.version.value.clone()));
694                            break; // Found the visible version for this key, move to next key
695                        }
696                        current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
697                    }
698                }
699                current = node_ref.next[0].load(Ordering::Acquire, guard);
700            } else {
701                break;
702            }
703        }
704        results
705    }
706
707    /// Returns a stream that yields visible key-value pairs within a given range.
708    pub fn range_stream<'a>(
709        &'a self,
710        start: &'a K,
711        end: &'a K,
712        snapshot: &'a Snapshot,
713    ) -> impl Stream<Item = (K, Arc<V>)> + 'a {
714        // Use async_stream to create a true streaming iterator
715        async_stream::stream! {
716            let guard = &crossbeam_epoch::pin();
717            let predecessor = self.find_optimistic_predecessor::<K>(start, guard);
718            let mut current = unsafe {
719                // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
720                // The `guard` protects the memory.
721                predecessor.deref().next[0].load(Ordering::Acquire, guard)
722            };
723
724            loop {
725                if let Some(node_ref) = unsafe {
726                    // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
727                    // The `guard` protects the memory.
728                    current.as_ref()
729                } {
730                    if node_ref.deleted.load(Ordering::Acquire) {
731                        current = node_ref.next[0].load(Ordering::Acquire, guard);
732                        continue;
733                    }
734                    let key = unsafe {
735                        // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
736                        node_ref.key.as_ref().unwrap_unchecked()
737                    };
738                    if key > end {
739                        break;
740                    }
741                    if key >= start {
742                        let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
743                        while let Some(version_node) = unsafe {
744                            // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
745                            // The `guard` protects the memory.
746                            current_version_ptr.as_ref()
747                        } {
748                            let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
749
750                            if is_visible {
751                                yield (key.clone(), version_node.version.value.clone());
752                                break; // Found the visible version for this key, move to next key
753                            }
754                            current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
755                        }
756                    }
757                    current = node_ref.next[0].load(Ordering::Acquire, guard);
758                } else {
759                    break;
760                }
761            }
762        }
763    }
764}
765
766impl<K, V> SkipList<K, V>
767where
768    K: Ord
769        + Clone
770        + Send
771        + Sync
772        + 'static
773        + Borrow<str>
774        + std::hash::Hash
775        + Eq
776        + Serialize
777        + DeserializeOwned,
778    V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
779{
780    /// Scans for keys starting with a given prefix and returns the visible versions as a `Vec`.
781    pub fn prefix_scan(&self, prefix: &str, snapshot: &Snapshot) -> Vec<(K, Arc<V>)> {
782        let guard = &crossbeam_epoch::pin();
783        let mut results = Vec::new();
784
785        let predecessor = self.find_optimistic_predecessor::<str>(prefix, guard);
786        let mut current = unsafe {
787            // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
788            // The `guard` protects the memory.
789            predecessor.deref().next[0].load(Ordering::Acquire, guard)
790        };
791
792        loop {
793            if let Some(node_ref) = unsafe {
794                // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
795                // The `guard` protects the memory.
796                current.as_ref()
797            } {
798                if node_ref.deleted.load(Ordering::Acquire) {
799                    current = node_ref.next[0].load(Ordering::Acquire, guard);
800                    continue;
801                }
802
803                let key = unsafe {
804                    // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
805                    node_ref.key.as_ref().unwrap_unchecked()
806                };
807                if key.borrow().starts_with(prefix) {
808                    let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
809                    while let Some(version_node) = unsafe {
810                        // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
811                        // The `guard` protects the memory.
812                        current_version_ptr.as_ref()
813                    } {
814                        let is_visible =
815                            snapshot.is_visible(&version_node.version, &*self.tx_manager);
816
817                        if is_visible {
818                            results.push((key.clone(), version_node.version.value.clone()));
819                            break; // Found the visible version for this key, move to next key
820                        }
821                        current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
822                    }
823                } else {
824                    // Since the skiplist is sorted, once we find a key that doesn't
825                    // have the prefix, no subsequent keys will either.
826                    break;
827                }
828                current = node_ref.next[0].load(Ordering::Acquire, guard);
829            } else {
830                break;
831            }
832        }
833        results
834    }
835
836    /// Returns a stream that yields visible key-value pairs for keys starting with a given prefix.
837    pub fn prefix_scan_stream<'a>(
838        &'a self,
839        prefix: &'a str,
840        snapshot: &'a Snapshot,
841    ) -> impl Stream<Item = (K, Arc<V>)> + 'a {
842        // Use async_stream to create a true streaming iterator
843        async_stream::stream! {
844            let guard = &crossbeam_epoch::pin();
845            let predecessor = self.find_optimistic_predecessor::<str>(prefix, guard);
846            let mut current = unsafe {
847                // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
848                // The `guard` protects the memory.
849                predecessor.deref().next[0].load(Ordering::Acquire, guard)
850            };
851
852            loop {
853                if let Some(node_ref) = unsafe {
854                    // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
855                    // The `guard` protects the memory.
856                    current.as_ref()
857                } {
858                    if node_ref.deleted.load(Ordering::Acquire) {
859                        current = node_ref.next[0].load(Ordering::Acquire, guard);
860                        continue;
861                    }
862
863                    let key = unsafe {
864                        // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
865                        node_ref.key.as_ref().unwrap_unchecked()
866                    };
867                    if key.borrow().starts_with(prefix) {
868                        let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
869                        while let Some(version_node) = unsafe {
870                            // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
871                            // The `guard` protects the memory.
872                            current_version_ptr.as_ref()
873                        } {
874                            let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
875
876                            if is_visible {
877                                yield (key.clone(), version_node.version.value.clone());
878                                break; // Found the visible version for this key, move to next key
879                            }
880                            current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
881                        }
882                    } else {
883                        break;
884                    }
885                    current = node_ref.next[0].load(Ordering::Acquire, guard);
886                } else {
887                    break;
888                }
889            }
890        }
891    }
892}