fluxmap/lib.rs
1#![doc = include_str!("../README.md")]
2//! The core, concurrent, multi-version skiplist implementation.
3//!
4//! This module provides `SkipList`, a highly concurrent data structure that serves
5//! as the foundation for FluxMap. It uses Multi-Version Concurrency Control (MVCC)
6//! to allow for non-blocking reads and high-performance writes.
7//!
8//! # Internals
9//!
10//! - **Nodes:** The skiplist is composed of `Node`s, each representing a key.
11//! - **Version Chains:** Each `Node` points to a linked list of `VersionNode`s.
12//! Each `VersionNode` represents a specific version of the value for that key,
13//! created by a specific transaction.
14//! - **MVCC:** When a value is updated, a new `VersionNode` is prepended to the
15//! chain. When a value is deleted, the most recent `VersionNode` is marked as
16
17use std::borrow::Borrow;
18use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
21};
22
23use crossbeam_epoch::{Atomic, Guard, Owned, Shared};
24use crossbeam_utils::CachePadded;
25use dashmap::DashSet; // Added for read_trackers
26use futures::stream::Stream;
27use serde::{Serialize, de::DeserializeOwned};
28
29pub mod db;
30pub mod error;
31pub mod persistence;
32pub mod transaction;
33pub mod vacuum;
34pub use crate::transaction::{Snapshot, Transaction, TransactionManager, TxId, Version};
35pub use persistence::{DurabilityLevel, PersistenceEngine, PersistenceOptions};
36
37const DEFAULT_MAX_LEVEL: usize = 32;
38const DEFAULT_P: f64 = 0.5;
39
40/// A node in the version chain for a single key.
41struct VersionNode<V> {
42 version: Version<V>,
43 next: Atomic<VersionNode<V>>,
44}
45
46impl<V> VersionNode<V> {
47 fn new(version: Version<V>) -> Owned<Self> {
48 Owned::new(Self {
49 version,
50 next: Atomic::null(),
51 })
52 }
53}
54
55/// A node in the skiplist, representing a key and its chain of versions.
56struct Node<K, V> {
57 key: Option<K>,
58 /// An atomically-managed pointer to the head of the version chain.
59 value: Atomic<VersionNode<Arc<V>>>,
60 /// The forward pointers for each level of the skiplist.
61 next: Vec<Atomic<Node<K, V>>>,
62 /// A flag indicating that this node is logically deleted and awaiting physical removal.
63 deleted: AtomicBool,
64}
65
66impl<K, V> Node<K, V> {
67 /// Creates a new head node for a skiplist.
68 fn head(max_level: usize) -> Owned<Self> {
69 Owned::new(Node {
70 key: None,
71 value: Atomic::null(),
72 next: (0..max_level).map(|_| Atomic::null()).collect(),
73 deleted: AtomicBool::new(false),
74 })
75 }
76
77 /// Creates a new data node with a single version.
78 fn new(key: K, value: Arc<V>, level: usize, txid: TxId) -> Owned<Self> {
79 let version = Version {
80 value,
81 creator_txid: txid,
82 expirer_txid: AtomicU64::new(0), // 0 means not expired.
83 };
84 let version_node = VersionNode::new(version);
85
86 Owned::new(Node {
87 key: Some(key),
88 value: Atomic::from(version_node),
89 next: (0..level + 1).map(|_| Atomic::null()).collect(),
90 deleted: AtomicBool::new(false),
91 })
92 }
93}
94
95/// A concurrent, multi-version, transactional skiplist.
96///
97/// `SkipList` is the core data structure that stores key-value pairs. It supports
98/// highly concurrent reads and writes using Multi-Version Concurrency Control (MVCC)
99/// and Serializable Snapshot Isolation (SSI).
100pub struct SkipList<K: Eq + std::hash::Hash, V> {
101 head: CachePadded<Atomic<Node<K, V>>>,
102 max_level: CachePadded<usize>,
103 level: CachePadded<AtomicUsize>,
104 len: CachePadded<AtomicUsize>,
105 p: CachePadded<f64>,
106 tx_manager: Arc<TransactionManager<K, V>>,
107}
108
109enum InsertAction {
110 YieldAndRetry,
111 Return,
112}
113
114impl<K, V> Default for SkipList<K, V>
115where
116 K: Ord + Clone + Send + Sync + 'static + std::hash::Hash + Eq + Serialize + DeserializeOwned,
117 V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
118{
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl<K, V> SkipList<K, V>
125where
126 K: Ord + Clone + Send + Sync + 'static + std::hash::Hash + Eq + Serialize + DeserializeOwned,
127 V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
128{
129 /// Creates a new, empty `SkipList` with the default max level.
130 pub fn new() -> Self {
131 Self::with_max_level(DEFAULT_MAX_LEVEL)
132 }
133
134 /// Creates a new, empty `SkipList` with a specified max level.
135 pub fn with_max_level(max_level: usize) -> Self {
136 Self::with_max_level_and_p(max_level, DEFAULT_P)
137 }
138
139 /// Creates a new, empty `SkipList` with a specified max level and probability factor.
140 pub fn with_max_level_and_p(max_level: usize, p: f64) -> Self {
141 let head = Node::head(max_level);
142 SkipList {
143 head: CachePadded::new(Atomic::from(head)),
144 max_level: CachePadded::new(max_level),
145 level: CachePadded::new(AtomicUsize::new(0)),
146 len: CachePadded::new(AtomicUsize::new(0)),
147 p: CachePadded::new(p),
148 tx_manager: Arc::new(TransactionManager::<K, V>::new()),
149 }
150 }
151
152 /// Returns a reference to the associated `TransactionManager`.
153 pub fn transaction_manager(&self) -> &Arc<TransactionManager<K, V>> {
154 &self.tx_manager
155 }
156
157 /// Returns the approximate number of keys in the skiplist.
158 ///
159 /// This is an approximation because it may not reflect in-flight additions or removals.
160 pub fn len(&self) -> usize {
161 self.len.load(Ordering::Relaxed)
162 }
163
164 /// Returns `true` if the skiplist contains no keys.
165 pub fn is_empty(&self) -> bool {
166 self.len() == 0
167 }
168
169 /// Generates a random level for a new node based on the probability factor `p`.
170 fn random_level(&self) -> usize {
171 let mut level = 0;
172 while fastrand::f64() < *self.p && level < *self.max_level - 1 {
173 level += 1;
174 }
175 level
176 }
177
178 /// Finds the predecessor node for a given key at a specific level.
179 /// This function also helps with physical removal of logically deleted nodes it encounters.
180 fn find_predecessor_at_level<'guard, Q: ?Sized>(
181 &self,
182 key: &Q,
183 mut current: Shared<'guard, Node<K, V>>,
184 level: usize,
185 guard: &'guard Guard,
186 ) -> Shared<'guard, Node<K, V>>
187 where
188 K: Borrow<Q>,
189 Q: Ord,
190 {
191 loop {
192 let next = unsafe {
193 // SAFETY: `current` is a `Shared` pointer obtained from `Atomic::from` or `load`
194 // operations, ensuring it's a valid, non-null pointer to a `Node<K, V>`.
195 // The `guard` ensures that the memory pointed to by `current` is protected
196 // from reclamation during this operation.
197 current.deref().next[level].load(Ordering::Relaxed, guard)
198 };
199
200 if let Some(next_node) = unsafe {
201 // SAFETY: `next` is a `Shared` pointer. `as_ref()` is safe as `next` is checked for null.
202 // The `guard` ensures the memory is protected.
203 next.as_ref()
204 } {
205 if next_node.deleted.load(Ordering::Acquire) {
206 // Node is logically deleted, help physically remove it.
207 let next_of_next = next_node.next[level].load(Ordering::Relaxed, guard);
208 if unsafe {
209 // SAFETY: `current` is a valid pointer as established above.
210 // The `compare_exchange` operation is atomic and ensures memory safety.
211 // We are attempting to swing the `next` pointer of `current` to skip over
212 // the logically deleted `next` node.
213 current.deref().next[level].compare_exchange(
214 next,
215 next_of_next,
216 Ordering::AcqRel,
217 Ordering::Relaxed,
218 guard,
219 )
220 }
221 .is_ok()
222 {
223 // Physical removal was successful.
224 // Only decrement len and schedule for destruction at the base level
225 // to ensure it happens exactly once per node.
226 if level == 0 {
227 self.len.fetch_sub(1, Ordering::Relaxed);
228 unsafe {
229 // SAFETY: `next` points to the unlinked node. Since we have successfully
230 // unlinked it, no other thread will be able to reach it through the skiplist.
231 // We can now safely schedule its memory to be reclaimed by the epoch-based
232 // garbage collector.
233 guard.defer_destroy(next);
234 }
235 }
236 }
237 // Retry finding the predecessor from `current`, as the list has changed.
238 continue;
239 }
240
241 // If there is a next node and its key is less than the target key, move forward.
242 // SAFETY: `next_node` is a valid reference. `key` is `Some` for all non-head nodes.
243 // `unwrap_unchecked` is safe here because we know `next_node` is not the head node,
244 // which is the only node type where `key` is `None`.
245 if <K as Borrow<Q>>::borrow(unsafe { next_node.key.as_ref().unwrap_unchecked() })
246 < key
247 {
248 current = next;
249 continue;
250 }
251 }
252
253 // Otherwise, `current` is the predecessor at this level.
254 break;
255 }
256 current
257 }
258
259 /// Finds the predecessor node for a given key by searching from the top level down.
260 fn find_optimistic_predecessor<'guard, Q: ?Sized>(
261 &self,
262 key: &Q,
263 guard: &'guard Guard,
264 ) -> Shared<'guard, Node<K, V>>
265 where
266 K: Borrow<Q>,
267 Q: Ord,
268 {
269 let head = self.head.load(Ordering::Relaxed, guard);
270 let mut predecessor = head;
271 for i in (0..*self.max_level).rev() {
272 predecessor = self.find_predecessor_at_level(key, predecessor, i, guard);
273 }
274 predecessor
275 }
276
277 /// Finds all predecessor nodes for a given key, one for each level of the skiplist.
278 fn find_predecessors<'guard, Q: ?Sized>(
279 &self,
280 key: &Q,
281 guard: &'guard Guard,
282 ) -> Vec<Shared<'guard, Node<K, V>>>
283 where
284 K: Borrow<Q>,
285 Q: Ord,
286 {
287 let head = self.head.load(Ordering::Relaxed, guard);
288 let mut predecessors = vec![Shared::null(); *self.max_level];
289 let mut current = head;
290
291 for i in (0..*self.max_level).rev() {
292 current = self.find_predecessor_at_level(key, current, i, guard);
293 predecessors[i] = current;
294 }
295 predecessors
296 }
297
298 /// Retrieves the value associated with `key` that is visible to the given `transaction`.
299 ///
300 /// This method traverses the skiplist to find the node for the given `key`. It then
301 /// walks the version chain for that node to find the most recent version that is
302 /// visible according to the transaction's `Snapshot`.
303 ///
304 /// As part of the SSI protocol, this operation adds the key to the transaction's
305 /// read set.
306 pub fn get(&self, key: &K, transaction: &Transaction<K, V>) -> Option<Arc<V>> {
307 let snapshot = &transaction.snapshot;
308 let guard = &crossbeam_epoch::pin();
309 let predecessor = self.find_optimistic_predecessor::<K>(key, guard);
310 let current = unsafe {
311 // SAFETY: `predecessor` is a `Shared` pointer to a valid `Node`. `deref()` is safe
312 // because `find_optimistic_predecessor` ensures it's not null. The `guard` protects the memory.
313 predecessor.deref().next[0].load(Ordering::Acquire, guard)
314 };
315
316 if let Some(node) = unsafe {
317 // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe as `current` is checked for null.
318 // The `guard` ensures the memory is protected.
319 current.as_ref()
320 } {
321 if unsafe {
322 // SAFETY: `node` is a valid reference. `key` is `Some` for all non-head nodes.
323 // `unwrap_unchecked` is safe as we've confirmed `node` is not the head.
324 node.key.as_ref().unwrap_unchecked()
325 } == key
326 && !node.deleted.load(Ordering::Acquire)
327 {
328 let mut current_version_ptr = node.value.load(Ordering::Acquire, guard);
329 while let Some(version_node) = unsafe {
330 // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe as
331 // it's checked for null. The `guard` ensures the memory is protected.
332 current_version_ptr.as_ref()
333 } {
334 let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
335
336 if is_visible {
337 // Record the read for SSI conflict detection.
338 transaction
339 .read_set
340 .insert(key.clone(), version_node.version.creator_txid);
341 // Add this transaction to the read_trackers for this key
342 self.tx_manager
343 .read_trackers
344 .entry(key.clone())
345 .or_insert_with(DashSet::new)
346 .insert(transaction.id);
347 return Some(version_node.version.value.clone());
348 }
349
350 current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
351 }
352 }
353 }
354 None
355 }
356
357 /// Checks if a key exists and is visible to the given `transaction`.
358 pub fn contains_key(&self, key: &K, transaction: &Transaction<K, V>) -> bool {
359 self.get(key, transaction).is_some()
360 }
361
362 /// Links a new node into the skiplist at all its levels.
363 fn link_new_node<'guard>(
364 &self,
365 key: &K,
366 mut predecessors: Vec<Shared<'guard, Node<K, V>>>,
367 new_node_shared: Shared<'guard, Node<K, V>>,
368 new_level: usize,
369 guard: &'guard Guard,
370 ) {
371 // Link the node from level 1 up to its randomly determined level.
372 // Level 0 is handled separately by the caller.
373 for i in 1..=new_level {
374 loop {
375 let pred = predecessors[i];
376 let next_at_level = unsafe {
377 // SAFETY: `pred` is a `Shared` pointer to a valid `Node`. `deref()` is safe
378 // because `find_predecessors` ensures it's valid. The `guard` protects the memory.
379 pred.deref().next[i].load(Ordering::Relaxed, guard)
380 };
381 unsafe {
382 // SAFETY: `new_node_shared` is a `Shared` pointer to a valid `Node`.
383 // `deref()` is safe. We are setting its forward pointer.
384 new_node_shared.deref().next[i].store(next_at_level, Ordering::Relaxed)
385 };
386
387 if unsafe {
388 // SAFETY: `pred` is a valid pointer. The `compare_exchange` is atomic and
389 // safely links the new node into the list at this level.
390 pred.deref().next[i].compare_exchange(
391 next_at_level,
392 new_node_shared,
393 Ordering::AcqRel,
394 Ordering::Acquire,
395 guard,
396 )
397 }
398 .is_ok()
399 {
400 break; // Success, move to the next level.
401 }
402 // CAS failed, contention. Re-find predecessors and retry for this level.
403 predecessors = self.find_predecessors::<K>(key, guard);
404 }
405 }
406
407 self.len.fetch_add(1, Ordering::Relaxed);
408 self.level.fetch_max(new_level, Ordering::Release);
409 }
410
411 /// Inserts a key-value pair as part of a transaction.
412 ///
413 /// If the key already exists, this prepends a new version to its version chain.
414 /// If the key does not exist, this creates a new `Node` and links it into the skiplist.
415 ///
416 /// This operation adds the key to the transaction's write set for SSI conflict detection.
417 pub async fn insert(&self, key: K, value: Arc<V>, transaction: &Transaction<K, V>) {
418 transaction.write_set.insert(key.clone());
419 let new_level = self.random_level();
420
421 loop {
422 let action = {
423 let guard = &crossbeam_epoch::pin();
424 let predecessors = self.find_predecessors::<K>(&key, guard);
425 let predecessor = predecessors[0];
426
427 let next = unsafe {
428 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
429 // The `guard` protects the memory.
430 predecessor.deref().next[0].load(Ordering::Relaxed, guard)
431 };
432
433 if let Some(next_node) = unsafe {
434 // SAFETY: `next` is a `Shared` pointer. `as_ref()` is safe as `next` is checked for null.
435 // The `guard` ensures the memory is protected.
436 next.as_ref()
437 } {
438 if unsafe {
439 // SAFETY: `next_node` is a valid reference. `key` is `Some` for all non-head nodes.
440 next_node.key.as_ref().unwrap_unchecked()
441 } == &key
442 {
443 // Key exists. Prepend a new version to the version chain.
444 let new_version = Version {
445 value: value.clone(),
446 creator_txid: transaction.id,
447 expirer_txid: AtomicU64::new(0),
448 };
449 let new_version_node = VersionNode::new(new_version);
450 let new_version_node_shared = new_version_node.into_shared(guard);
451
452 loop {
453 let current_head_ptr = next_node.value.load(Ordering::Acquire, guard);
454 unsafe {
455 // SAFETY: `new_version_node_shared` is a valid `Shared` pointer.
456 // `deref()` is safe. We are setting its `next` pointer to the current
457 // head of the version chain.
458 new_version_node_shared
459 .deref()
460 .next
461 .store(current_head_ptr, Ordering::Relaxed)
462 };
463
464 // Atomically swing the `value` pointer to the new version node.
465 match next_node.value.compare_exchange(
466 current_head_ptr,
467 new_version_node_shared,
468 Ordering::AcqRel,
469 Ordering::Acquire,
470 guard,
471 ) {
472 Ok(_) => break InsertAction::Return, // Success
473 Err(_) => continue, // Contention, retry CAS loop
474 }
475 }
476 } else {
477 // Key does not exist, create a new node.
478 let new_node =
479 Node::new(key.clone(), value.clone(), new_level, transaction.id);
480 let new_node_shared = new_node.into_shared(guard);
481
482 unsafe {
483 // SAFETY: `new_node_shared` is a valid `Shared` pointer. `deref()` is safe.
484 // We are setting its level 0 forward pointer.
485 new_node_shared.deref().next[0].store(next, Ordering::Relaxed)
486 };
487
488 if unsafe {
489 // SAFETY: `predecessor` is a valid pointer. The `compare_exchange` is atomic
490 // and safely links the new node into the base level of the list.
491 predecessor.deref().next[0].compare_exchange(
492 next,
493 new_node_shared,
494 Ordering::AcqRel,
495 Ordering::Acquire,
496 guard,
497 )
498 }
499 .is_err()
500 {
501 InsertAction::YieldAndRetry // Contention, retry whole operation.
502 } else {
503 // Link the node at higher levels.
504 self.link_new_node(
505 &key,
506 predecessors,
507 new_node_shared,
508 new_level,
509 guard,
510 );
511 InsertAction::Return
512 }
513 }
514 } else {
515 // List is empty or we are at the end. Create a new node.
516 let new_node = Node::new(key.clone(), value.clone(), new_level, transaction.id);
517 let new_node_shared = new_node.into_shared(guard);
518
519 unsafe {
520 // SAFETY: `new_node_shared` is a valid `Shared` pointer. `deref()` is safe.
521 new_node_shared.deref().next[0].store(next, Ordering::Relaxed)
522 };
523
524 if unsafe {
525 // SAFETY: `predecessor` is a valid pointer. The `compare_exchange` is atomic.
526 predecessor.deref().next[0].compare_exchange(
527 next,
528 new_node_shared,
529 Ordering::AcqRel,
530 Ordering::Acquire,
531 guard,
532 )
533 }
534 .is_err()
535 {
536 InsertAction::YieldAndRetry
537 } else {
538 self.link_new_node(&key, predecessors, new_node_shared, new_level, guard);
539 InsertAction::Return
540 }
541 }
542 };
543
544 match action {
545 InsertAction::YieldAndRetry => {
546 // Implement exponential backoff
547 let mut attempts = 0;
548 loop {
549 attempts += 1;
550 if attempts < 5 {
551 // Spin for a few attempts
552 std::thread::yield_now();
553 } else {
554 // Then yield to Tokio runtime with increasing delay
555 let delay_ms = 2u64.pow(attempts - 5); // Exponential delay
556 tokio::time::sleep(std::time::Duration::from_millis(delay_ms.min(100)))
557 .await; // Cap delay at 100ms
558 }
559 // Break from the inner loop to let the outer loop re-run the whole insert logic.
560 break;
561 }
562 continue; // Continue the outer loop to retry the insert operation
563 }
564 InsertAction::Return => return,
565 }
566 }
567 }
568
569 /// Logically removes a key as part of a transaction.
570 ///
571 /// This finds the latest visible version of the key and atomically sets its
572 /// `expirer_txid` to the current transaction's ID. The actual data is not
573 /// removed until the vacuum process runs.
574 ///
575 /// This operation adds the key to the transaction's write set.
576 ///
577 /// # Returns
578 ///
579 /// Returns the value that was removed if a visible version was found, otherwise `None`.
580 pub async fn remove(&self, key: &K, transaction: &Transaction<K, V>) -> Option<Arc<V>> {
581 transaction.write_set.insert(key.clone());
582 let transaction_id = transaction.id;
583
584 let guard = &crossbeam_epoch::pin();
585 let predecessor = self.find_optimistic_predecessor::<K>(key, guard);
586 let node_ptr = unsafe {
587 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
588 // The `guard` protects the memory.
589 predecessor.deref().next[0].load(Ordering::Acquire, guard)
590 };
591
592 if let Some(node) = unsafe {
593 // SAFETY: `node_ptr` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
594 // The `guard` protects the memory.
595 node_ptr.as_ref()
596 } {
597 if unsafe {
598 // SAFETY: `node` is a valid reference. `key` is `Some` for all non-head nodes.
599 node.key.as_ref().unwrap_unchecked()
600 } != key
601 {
602 return None; // Key not found.
603 }
604
605 // If the node is already marked as deleted by the vacuum, we can't do anything.
606 if node.deleted.load(Ordering::Acquire) {
607 return None;
608 }
609
610 let mut version_ptr = node.value.load(Ordering::Acquire, guard);
611 while let Some(version_node) = unsafe {
612 // SAFETY: `version_ptr` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
613 // The `guard` protects the memory.
614 version_ptr.as_ref()
615 } {
616 // Check if the version is visible to the current transaction.
617 let is_visible = transaction
618 .snapshot
619 .is_visible(&version_node.version, &*self.tx_manager);
620
621 if is_visible {
622 // This is a version we can try to expire.
623 // Atomically set the expirer_txid from 0 to our transaction ID.
624 match version_node.version.expirer_txid.compare_exchange(
625 0,
626 transaction_id,
627 Ordering::AcqRel,
628 Ordering::Acquire,
629 ) {
630 Ok(_) => {
631 // Success! We expired this version.
632 return Some(version_node.version.value.clone());
633 }
634 Err(_) => {
635 // CAS failed. Another concurrent transaction just expired it.
636 // This version is no longer visible to us.
637 // We continue the loop to find the next visible version.
638 }
639 }
640 }
641
642 // Move to the next version in the chain.
643 version_ptr = version_node.next.load(Ordering::Acquire, guard);
644 }
645
646 // If we reach here, no visible version was found or we lost all races.
647 return None;
648 } else {
649 return None; // Node not found.
650 }
651 }
652
653 /// Scans a range of keys and returns the visible versions as a `Vec`.
654 pub fn range(&self, start: &K, end: &K, snapshot: &Snapshot) -> Vec<(K, Arc<V>)> {
655 let guard = &crossbeam_epoch::pin();
656 let mut results = Vec::new();
657
658 let predecessor = self.find_optimistic_predecessor::<K>(start, guard);
659 let mut current = unsafe {
660 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
661 // The `guard` protects the memory.
662 predecessor.deref().next[0].load(Ordering::Acquire, guard)
663 };
664
665 loop {
666 if let Some(node_ref) = unsafe {
667 // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe as it's checked for null.
668 // The `guard` protects the memory.
669 current.as_ref()
670 } {
671 if node_ref.deleted.load(Ordering::Acquire) {
672 current = node_ref.next[0].load(Ordering::Acquire, guard);
673 continue;
674 }
675 let key = unsafe {
676 // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
677 node_ref.key.as_ref().unwrap_unchecked()
678 };
679 if key > end {
680 break;
681 }
682 if key >= start {
683 let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
684 while let Some(version_node) = unsafe {
685 // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
686 // The `guard` protects the memory.
687 current_version_ptr.as_ref()
688 } {
689 let is_visible =
690 snapshot.is_visible(&version_node.version, &*self.tx_manager);
691
692 if is_visible {
693 results.push((key.clone(), version_node.version.value.clone()));
694 break; // Found the visible version for this key, move to next key
695 }
696 current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
697 }
698 }
699 current = node_ref.next[0].load(Ordering::Acquire, guard);
700 } else {
701 break;
702 }
703 }
704 results
705 }
706
707 /// Returns a stream that yields visible key-value pairs within a given range.
708 pub fn range_stream<'a>(
709 &'a self,
710 start: &'a K,
711 end: &'a K,
712 snapshot: &'a Snapshot,
713 ) -> impl Stream<Item = (K, Arc<V>)> + 'a {
714 // Use async_stream to create a true streaming iterator
715 async_stream::stream! {
716 let guard = &crossbeam_epoch::pin();
717 let predecessor = self.find_optimistic_predecessor::<K>(start, guard);
718 let mut current = unsafe {
719 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
720 // The `guard` protects the memory.
721 predecessor.deref().next[0].load(Ordering::Acquire, guard)
722 };
723
724 loop {
725 if let Some(node_ref) = unsafe {
726 // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
727 // The `guard` protects the memory.
728 current.as_ref()
729 } {
730 if node_ref.deleted.load(Ordering::Acquire) {
731 current = node_ref.next[0].load(Ordering::Acquire, guard);
732 continue;
733 }
734 let key = unsafe {
735 // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
736 node_ref.key.as_ref().unwrap_unchecked()
737 };
738 if key > end {
739 break;
740 }
741 if key >= start {
742 let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
743 while let Some(version_node) = unsafe {
744 // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
745 // The `guard` protects the memory.
746 current_version_ptr.as_ref()
747 } {
748 let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
749
750 if is_visible {
751 yield (key.clone(), version_node.version.value.clone());
752 break; // Found the visible version for this key, move to next key
753 }
754 current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
755 }
756 }
757 current = node_ref.next[0].load(Ordering::Acquire, guard);
758 } else {
759 break;
760 }
761 }
762 }
763 }
764}
765
766impl<K, V> SkipList<K, V>
767where
768 K: Ord
769 + Clone
770 + Send
771 + Sync
772 + 'static
773 + Borrow<str>
774 + std::hash::Hash
775 + Eq
776 + Serialize
777 + DeserializeOwned,
778 V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
779{
780 /// Scans for keys starting with a given prefix and returns the visible versions as a `Vec`.
781 pub fn prefix_scan(&self, prefix: &str, snapshot: &Snapshot) -> Vec<(K, Arc<V>)> {
782 let guard = &crossbeam_epoch::pin();
783 let mut results = Vec::new();
784
785 let predecessor = self.find_optimistic_predecessor::<str>(prefix, guard);
786 let mut current = unsafe {
787 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
788 // The `guard` protects the memory.
789 predecessor.deref().next[0].load(Ordering::Acquire, guard)
790 };
791
792 loop {
793 if let Some(node_ref) = unsafe {
794 // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
795 // The `guard` protects the memory.
796 current.as_ref()
797 } {
798 if node_ref.deleted.load(Ordering::Acquire) {
799 current = node_ref.next[0].load(Ordering::Acquire, guard);
800 continue;
801 }
802
803 let key = unsafe {
804 // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
805 node_ref.key.as_ref().unwrap_unchecked()
806 };
807 if key.borrow().starts_with(prefix) {
808 let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
809 while let Some(version_node) = unsafe {
810 // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
811 // The `guard` protects the memory.
812 current_version_ptr.as_ref()
813 } {
814 let is_visible =
815 snapshot.is_visible(&version_node.version, &*self.tx_manager);
816
817 if is_visible {
818 results.push((key.clone(), version_node.version.value.clone()));
819 break; // Found the visible version for this key, move to next key
820 }
821 current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
822 }
823 } else {
824 // Since the skiplist is sorted, once we find a key that doesn't
825 // have the prefix, no subsequent keys will either.
826 break;
827 }
828 current = node_ref.next[0].load(Ordering::Acquire, guard);
829 } else {
830 break;
831 }
832 }
833 results
834 }
835
836 /// Returns a stream that yields visible key-value pairs for keys starting with a given prefix.
837 pub fn prefix_scan_stream<'a>(
838 &'a self,
839 prefix: &'a str,
840 snapshot: &'a Snapshot,
841 ) -> impl Stream<Item = (K, Arc<V>)> + 'a {
842 // Use async_stream to create a true streaming iterator
843 async_stream::stream! {
844 let guard = &crossbeam_epoch::pin();
845 let predecessor = self.find_optimistic_predecessor::<str>(prefix, guard);
846 let mut current = unsafe {
847 // SAFETY: `predecessor` is a valid `Shared` pointer. `deref()` is safe.
848 // The `guard` protects the memory.
849 predecessor.deref().next[0].load(Ordering::Acquire, guard)
850 };
851
852 loop {
853 if let Some(node_ref) = unsafe {
854 // SAFETY: `current` is a `Shared` pointer. `as_ref()` is safe.
855 // The `guard` protects the memory.
856 current.as_ref()
857 } {
858 if node_ref.deleted.load(Ordering::Acquire) {
859 current = node_ref.next[0].load(Ordering::Acquire, guard);
860 continue;
861 }
862
863 let key = unsafe {
864 // SAFETY: `node_ref` is a valid reference. `key` is `Some` for all non-head nodes.
865 node_ref.key.as_ref().unwrap_unchecked()
866 };
867 if key.borrow().starts_with(prefix) {
868 let mut current_version_ptr = node_ref.value.load(Ordering::Acquire, guard);
869 while let Some(version_node) = unsafe {
870 // SAFETY: `current_version_ptr` is a `Shared` pointer. `as_ref()` is safe.
871 // The `guard` protects the memory.
872 current_version_ptr.as_ref()
873 } {
874 let is_visible = snapshot.is_visible(&version_node.version, &*self.tx_manager);
875
876 if is_visible {
877 yield (key.clone(), version_node.version.value.clone());
878 break; // Found the visible version for this key, move to next key
879 }
880 current_version_ptr = version_node.next.load(Ordering::Acquire, guard);
881 }
882 } else {
883 break;
884 }
885 current = node_ref.next[0].load(Ordering::Acquire, guard);
886 } else {
887 break;
888 }
889 }
890 }
891 }
892}