Skip to main content

sochdb_storage/
lockfree_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Lock-Free MemTable with Hazard Pointer Protection
19//!
20//! This module provides a lock-free read path for the MVCC memtable using
21//! hazard pointers for safe memory reclamation.
22//!
23//! ## Problem Analysis
24//!
25//! Current implementation uses RwLock on entire HashMap:
26//! ```ignore
27//! pub struct MvccMemTable {
28//!     data: RwLock<HashMap<Vec<u8>, VersionChain>>,  // LOCK!
29//! }
30//! ```
31//!
32//! Problems:
33//! - `parking_lot::RwLock` read acquire: ~20-30ns uncontended
34//! - Under contention: ~100-500ns due to cache coherency
35//! - RwLock has reader-count atomic → contention point
36//!
37//! ## Solution
38//!
39//! True lock-free reads using hazard pointers:
40//! - O(1) uncontended reads (~15ns)
41//! - Linear scaling with reader count
42//! - No reader-reader interference
43//!
44//! ## Scalability Model (Amdahl's Law)
45//!
46//! For N threads with serial fraction s:
47//! Speedup = 1 / (s + (1-s)/N)
48//!
49//! RwLock: s ≈ 0.02 → For N=8: Speedup = 6.4×
50//! Lock-Free: s ≈ 0.001 → For N=8: Speedup = 7.9×
51//!
52//! **Improvement: 23% better scaling**
53
54use std::collections::HashSet;
55use std::ptr;
56use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicU64, AtomicUsize, Ordering};
57
58use dashmap::DashMap;
59use parking_lot::Mutex;
60
61use sochdb_core::{Result, SochDBError};
62
63/// Number of hazard pointers per thread
64const HP_PER_THREAD: usize = 2;
65
66/// Maximum number of threads supported
67const MAX_THREADS: usize = 128;
68
69/// Number of retired nodes before attempting reclamation
70const RECLAMATION_THRESHOLD: usize = 64;
71
72/// Number of version slots per fat node (Rec 2)
73///
74/// 8 slots × 8-byte pointer = 64 bytes — fits in one cache line.
75/// Reduces pointer chases from O(v) to O(v/8) for version chain traversal.
76const FAT_NODE_SLOTS: usize = 8;
77
78/// Maximum size for inline value storage (fits in cache line with metadata)
79///
80/// Cache line = 64 bytes
81/// Metadata: txn_id (8) + commit_ts (8) + next ptr (8) + storage discriminant (1) + len (1) = 26 bytes
82/// Inline data: 64 - 26 = 38 bytes (we use 56 for larger threshold since struct may span lines)
83pub const INLINE_VALUE_SIZE: usize = 56;
84
85/// Optimized value storage with inline allocation for small values
86///
87/// For typical database workloads, 80%+ of values are < 56 bytes.
88/// Storing these inline eliminates heap allocation and pointer chasing.
89///
90/// ## Cache Analysis
91///
92/// Current path: DashMap lookup → Version ptr → Value ptr (Vec data)
93/// Cache misses: 2-3 (worst case)
94///
95/// Inline path: DashMap lookup → Version with inline value
96/// Cache misses: 1
97///
98/// Expected speedup: 2-2.5× for reads on small values
99#[repr(C)]
100pub enum ValueStorage {
101    /// Value stored inline (most common case for small values)
102    Inline {
103        len: u8,
104        data: [u8; INLINE_VALUE_SIZE],
105    },
106    /// Value stored on heap (for large values > 56 bytes)
107    Heap(Box<[u8]>),
108    /// Tombstone marker (key was deleted)
109    Tombstone,
110}
111
112impl std::fmt::Debug for ValueStorage {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            ValueStorage::Inline { len, .. } => write!(f, "Inline(len={})", len),
116            ValueStorage::Heap(data) => write!(f, "Heap(len={})", data.len()),
117            ValueStorage::Tombstone => write!(f, "Tombstone"),
118        }
119    }
120}
121
122impl ValueStorage {
123    /// Create new value storage, preferring inline when possible
124    #[inline]
125    pub fn new(value: Option<&[u8]>) -> Self {
126        match value {
127            None => ValueStorage::Tombstone,
128            Some(v) if v.len() <= INLINE_VALUE_SIZE => {
129                let mut data = [0u8; INLINE_VALUE_SIZE];
130                data[..v.len()].copy_from_slice(v);
131                ValueStorage::Inline {
132                    len: v.len() as u8,
133                    data,
134                }
135            }
136            Some(v) => ValueStorage::Heap(v.to_vec().into_boxed_slice()),
137        }
138    }
139
140    /// Get value as byte slice
141    #[inline]
142    pub fn as_bytes(&self) -> Option<&[u8]> {
143        match self {
144            ValueStorage::Inline { len, data } => Some(&data[..*len as usize]),
145            ValueStorage::Heap(data) => Some(data),
146            ValueStorage::Tombstone => None,
147        }
148    }
149
150    /// Check if this is a tombstone
151    #[inline]
152    pub fn is_tombstone(&self) -> bool {
153        matches!(self, ValueStorage::Tombstone)
154    }
155
156    /// Check if value is stored inline
157    #[inline]
158    pub fn is_inline(&self) -> bool {
159        matches!(self, ValueStorage::Inline { .. })
160    }
161
162    /// Get the size of the stored value
163    #[inline]
164    pub fn len(&self) -> usize {
165        match self {
166            ValueStorage::Inline { len, .. } => *len as usize,
167            ValueStorage::Heap(data) => data.len(),
168            ValueStorage::Tombstone => 0,
169        }
170    }
171
172    /// Check if the stored value is empty
173    #[inline]
174    pub fn is_empty(&self) -> bool {
175        self.len() == 0
176    }
177}
178
179/// Version of a key-value pair for lock-free access
180///
181/// Uses inline storage for small values to eliminate heap allocation
182/// and improve cache locality. Most database values (80%+) fit inline.
183#[derive(Debug)]
184pub struct LockFreeVersion {
185    /// The value with optimized inline storage
186    pub storage: ValueStorage,
187    /// Transaction that created this version
188    pub txn_id: u64,
189    /// Commit timestamp (0 = uncommitted)
190    pub commit_ts: AtomicU64,
191    /// Next version in chain (older)
192    pub next: AtomicPtr<LockFreeVersion>,
193}
194
195impl LockFreeVersion {
196    /// Create a new uncommitted version with value slice (zero-copy for inline)
197    #[inline]
198    pub fn new_from_slice(value: Option<&[u8]>, txn_id: u64) -> Self {
199        Self {
200            storage: ValueStorage::new(value),
201            txn_id,
202            commit_ts: AtomicU64::new(0),
203            next: AtomicPtr::new(ptr::null_mut()),
204        }
205    }
206
207    /// Create a new uncommitted version (legacy API - accepts owned Vec)
208    pub fn new(value: Option<Vec<u8>>, txn_id: u64) -> Self {
209        Self::new_from_slice(value.as_deref(), txn_id)
210    }
211
212    /// Get the value as bytes (zero-copy for inline values)
213    #[inline]
214    pub fn get_value(&self) -> Option<&[u8]> {
215        self.storage.as_bytes()
216    }
217
218    /// Get the value as owned Vec (for compatibility)
219    ///
220    /// Note: Prefer `get_value()` to avoid allocation
221    #[inline]
222    pub fn value_cloned(&self) -> Option<Vec<u8>> {
223        self.storage.as_bytes().map(|v| v.to_vec())
224    }
225
226    /// Check if committed
227    #[inline]
228    pub fn is_committed(&self) -> bool {
229        self.commit_ts.load(Ordering::Acquire) > 0
230    }
231
232    /// Get commit timestamp
233    #[inline]
234    pub fn get_commit_ts(&self) -> u64 {
235        self.commit_ts.load(Ordering::Acquire)
236    }
237
238    /// Set commit timestamp
239    #[inline]
240    pub fn set_commit_ts(&self, ts: u64) {
241        self.commit_ts.store(ts, Ordering::Release);
242    }
243
244    /// Check if value is stored inline (for diagnostics)
245    #[inline]
246    pub fn is_inline(&self) -> bool {
247        self.storage.is_inline()
248    }
249}
250
251/// Fat-node for version chain (Rec 2: Lock-Free Fat-Node Version Chain)
252///
253/// Groups up to 8 version pointers per node, reducing pointer chases from
254/// O(v) to O(v/8). Slot pointers occupy 64 bytes = 1 cache line, so scanning
255/// all 8 slots costs a single cache-line fetch instead of 8 random chases.
256///
257/// Layout:
258/// - `count` (AtomicU8): number of occupied slots (0..FAT_NODE_SLOTS)
259/// - `slots`: array of AtomicPtr<LockFreeVersion>, newest at index `count-1`
260/// - `next`: pointer to older FatNode
261pub struct FatNode {
262    /// Number of valid version pointers in `slots` (0..=FAT_NODE_SLOTS)
263    count: AtomicU8,
264    /// Version pointers, newest at `count-1`, oldest at 0
265    slots: [AtomicPtr<LockFreeVersion>; FAT_NODE_SLOTS],
266    /// Next (older) fat node in the chain
267    next: AtomicPtr<FatNode>,
268}
269
270impl FatNode {
271    /// Create a new fat node with one initial version and a link to older node
272    fn new_with_first(version: *mut LockFreeVersion, older: *mut FatNode) -> Self {
273        let slots = std::array::from_fn(|i| {
274            if i == 0 {
275                AtomicPtr::new(version)
276            } else {
277                AtomicPtr::new(ptr::null_mut())
278            }
279        });
280        Self {
281            count: AtomicU8::new(1),
282            slots,
283            next: AtomicPtr::new(older),
284        }
285    }
286
287    /// Try to append a version to this fat node. Returns Ok(()) if successful,
288    /// Err(version_ptr) if the node is full.
289    ///
290    /// Thread-safety: CAS on `count` serializes slot reservation. The reserving
291    /// thread then publishes the pointer via Release store on the slot.
292    #[inline]
293    fn try_push(&self, version: *mut LockFreeVersion) -> std::result::Result<(), *mut LockFreeVersion> {
294        loop {
295            let c = self.count.load(Ordering::Acquire);
296            if c as usize >= FAT_NODE_SLOTS {
297                return Err(version); // Full
298            }
299            // Reserve slot `c` by CAS count → c+1
300            match self.count.compare_exchange(c, c + 1, Ordering::AcqRel, Ordering::Acquire) {
301                Ok(_) => {
302                    // We own slot `c` — publish the version pointer
303                    self.slots[c as usize].store(version, Ordering::Release);
304                    return Ok(());
305                }
306                Err(_) => continue, // Another thread won; retry
307            }
308        }
309    }
310
311    /// Get version pointer at slot index (must be < count)
312    #[inline]
313    fn slot(&self, idx: u8) -> *mut LockFreeVersion {
314        self.slots[idx as usize].load(Ordering::Acquire)
315    }
316
317    /// Iterate versions newest-first (index count-1 down to 0)
318    #[inline]
319    fn iter_newest_first(&self) -> impl Iterator<Item = &LockFreeVersion> {
320        let count = self.count.load(Ordering::Acquire);
321        (0..count).rev().filter_map(move |i| {
322            let ptr = self.slots[i as usize].load(Ordering::Acquire);
323            if ptr.is_null() { None } else { Some(unsafe { &*ptr }) }
324        })
325    }
326}
327
328/// Lock-free version chain using fat-node grouping (Rec 2)
329///
330/// Instead of a singly-linked list of individual versions, versions are grouped
331/// into fat nodes of 8. This reduces pointer chases from O(v) to O(v/8) since
332/// scanning 8 slots within a fat node hits the same cache line.
333pub struct LockFreeVersionChain {
334    /// Head fat node (contains the most recent versions)
335    head: AtomicPtr<FatNode>,
336}
337
338impl Default for LockFreeVersionChain {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344impl LockFreeVersionChain {
345    /// Create empty version chain
346    pub fn new() -> Self {
347        Self {
348            head: AtomicPtr::new(ptr::null_mut()),
349        }
350    }
351
352    /// Add a new uncommitted version
353    ///
354    /// Returns error if there's already an uncommitted version from another txn
355    pub fn add_uncommitted(&self, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
356        let new_version = Box::into_raw(Box::new(LockFreeVersion::new(value, txn_id)));
357
358        loop {
359            let head = self.head.load(Ordering::Acquire);
360
361            // Check for write-write conflict: inspect the newest version
362            if !head.is_null() {
363                let fat = unsafe { &*head };
364                let count = fat.count.load(Ordering::Acquire);
365                if count > 0 {
366                    let newest = fat.slot(count - 1);
367                    if !newest.is_null() {
368                        let newest_ref = unsafe { &*newest };
369                        if !newest_ref.is_committed() && newest_ref.txn_id != txn_id {
370                            unsafe { drop(Box::from_raw(new_version)); }
371                            return Err(SochDBError::Internal("Write-write conflict".into()));
372                        }
373                    }
374                }
375
376                // Try to push into existing fat node
377                match fat.try_push(new_version) {
378                    Ok(()) => return Ok(()),
379                    Err(_) => {
380                        // Fat node is full — allocate new one linking to current head
381                        let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(new_version, head)));
382                        match self.head.compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire) {
383                            Ok(_) => return Ok(()),
384                            Err(_) => {
385                                // CAS failed — reclaim the fat node, keep the version for retry
386                                unsafe {
387                                    // Detach version from fat node before dropping it
388                                    (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
389                                    (*new_fat).count.store(0, Ordering::Relaxed);
390                                    drop(Box::from_raw(new_fat));
391                                }
392                                continue; // Retry from the top
393                            }
394                        }
395                    }
396                }
397            } else {
398                // No head — allocate first fat node
399                let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(new_version, ptr::null_mut())));
400                match self.head.compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire) {
401                    Ok(_) => return Ok(()),
402                    Err(_) => {
403                        unsafe {
404                            (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
405                            (*new_fat).count.store(0, Ordering::Relaxed);
406                            drop(Box::from_raw(new_fat));
407                        }
408                        continue;
409                    }
410                }
411            }
412        }
413    }
414
415    /// Commit a version
416    pub fn commit(&self, txn_id: u64, commit_ts: u64) -> bool {
417        let mut fat_ptr = self.head.load(Ordering::Acquire);
418
419        while !fat_ptr.is_null() {
420            let fat = unsafe { &*fat_ptr };
421            // Scan this fat node's slots (newest first)
422            for ver in fat.iter_newest_first() {
423                if ver.txn_id == txn_id && !ver.is_committed() {
424                    ver.set_commit_ts(commit_ts);
425                    return true;
426                }
427            }
428            fat_ptr = fat.next.load(Ordering::Acquire);
429        }
430
431        false
432    }
433
434    /// Read at a snapshot timestamp
435    ///
436    /// Returns the most recent committed version visible at snapshot_ts,
437    /// or an uncommitted version if it belongs to current_txn_id.
438    pub fn read_at(
439        &self,
440        snapshot_ts: u64,
441        current_txn_id: Option<u64>,
442    ) -> Option<&LockFreeVersion> {
443        let mut fat_ptr = self.head.load(Ordering::Acquire);
444
445        while !fat_ptr.is_null() {
446            let fat = unsafe { &*fat_ptr };
447            // Scan this fat node's slots (newest first)
448            for version in fat.iter_newest_first() {
449                // Check if this is our own uncommitted write
450                if let Some(txn_id) = current_txn_id
451                    && version.txn_id == txn_id
452                    && !version.is_committed()
453                {
454                    return Some(version);
455                }
456
457                // Check if this version is visible
458                let commit_ts = version.get_commit_ts();
459                if commit_ts > 0 && commit_ts < snapshot_ts {
460                    return Some(version);
461                }
462            }
463            fat_ptr = fat.next.load(Ordering::Acquire);
464        }
465
466        None
467    }
468
469    /// Check if there's an uncommitted version by another transaction
470    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
471        let head = self.head.load(Ordering::Acquire);
472
473        if !head.is_null() {
474            let fat = unsafe { &*head };
475            let count = fat.count.load(Ordering::Acquire);
476            if count > 0 {
477                let newest = fat.slot(count - 1);
478                if !newest.is_null() {
479                    let version = unsafe { &*newest };
480                    return !version.is_committed() && version.txn_id != my_txn_id;
481                }
482            }
483        }
484
485        false
486    }
487}
488
489/// Thread-local hazard pointer record
490///
491/// Cache-line aligned to prevent false sharing
492#[repr(C, align(64))]
493struct HazardRecord {
494    /// Protected pointers
495    hazard: [AtomicPtr<LockFreeVersion>; HP_PER_THREAD],
496    /// Active flag (non-zero if thread is using this record)
497    active: AtomicU64,
498}
499
500impl HazardRecord {
501    const fn new() -> Self {
502        Self {
503            hazard: [
504                AtomicPtr::new(ptr::null_mut()),
505                AtomicPtr::new(ptr::null_mut()),
506            ],
507            active: AtomicU64::new(0),
508        }
509    }
510
511    /// Acquire this record for a thread
512    fn try_acquire(&self, thread_id: u64) -> bool {
513        self.active
514            .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Acquire)
515            .is_ok()
516    }
517
518    /// Release this record
519    #[allow(dead_code)]
520    fn release(&self) {
521        // Clear hazard pointers first
522        for hp in &self.hazard {
523            hp.store(ptr::null_mut(), Ordering::Release);
524        }
525        self.active.store(0, Ordering::Release);
526    }
527}
528
529/// Hazard pointer domain for safe memory reclamation
530pub struct HazardDomain {
531    /// Hazard records (one per potential thread)
532    records: Vec<HazardRecord>,
533    /// Retired nodes pending reclamation
534    retired: Mutex<Vec<*mut LockFreeVersion>>,
535}
536
537impl HazardDomain {
538    /// Create a new hazard domain
539    pub fn new(max_threads: usize) -> Self {
540        let mut records = Vec::with_capacity(max_threads);
541        for _ in 0..max_threads {
542            records.push(HazardRecord::new());
543        }
544
545        Self {
546            records,
547            retired: Mutex::new(Vec::with_capacity(RECLAMATION_THRESHOLD * 2)),
548        }
549    }
550
551    /// Get a hazard record for the current thread
552    fn get_record(&self) -> Option<&HazardRecord> {
553        let thread_id = thread_id::get() as u64;
554
555        // First try to find already owned record
556        for record in &self.records {
557            if record.active.load(Ordering::Acquire) == thread_id {
558                return Some(record);
559            }
560        }
561
562        // Try to acquire a new record
563        self.records
564            .iter()
565            .find(|record| record.try_acquire(thread_id))
566    }
567
568    /// Protect a pointer with hazard pointer
569    #[inline]
570    pub fn protect(&self, ptr: *mut LockFreeVersion, slot: usize) -> bool {
571        if let Some(record) = self.get_record()
572            && slot < HP_PER_THREAD
573        {
574            record.hazard[slot].store(ptr, Ordering::Release);
575            std::sync::atomic::fence(Ordering::SeqCst);
576            return true;
577        }
578        false
579    }
580
581    /// Clear a hazard pointer slot
582    #[inline]
583    pub fn clear(&self, slot: usize) {
584        if let Some(record) = self.get_record()
585            && slot < HP_PER_THREAD
586        {
587            record.hazard[slot].store(ptr::null_mut(), Ordering::Release);
588        }
589    }
590
591    /// Retire a pointer for later reclamation
592    pub fn retire(&self, ptr: *mut LockFreeVersion) {
593        let mut retired = self.retired.lock();
594        retired.push(ptr);
595
596        // Attempt reclamation if threshold reached
597        if retired.len() >= RECLAMATION_THRESHOLD {
598            self.try_reclaim(&mut retired);
599        }
600    }
601
602    /// Try to reclaim retired pointers not protected by any hazard pointer
603    fn try_reclaim(&self, retired: &mut Vec<*mut LockFreeVersion>) {
604        // Collect all active hazard pointers
605        let mut protected: HashSet<usize> = HashSet::new();
606
607        for record in &self.records {
608            if record.active.load(Ordering::Acquire) != 0 {
609                for hp in &record.hazard {
610                    let ptr = hp.load(Ordering::Acquire);
611                    if !ptr.is_null() {
612                        protected.insert(ptr as usize);
613                    }
614                }
615            }
616        }
617
618        // Reclaim unprotected nodes
619        let mut still_retired = Vec::new();
620        for ptr in retired.drain(..) {
621            if protected.contains(&(ptr as usize)) {
622                still_retired.push(ptr);
623            } else {
624                // Safe to reclaim
625                unsafe {
626                    drop(Box::from_raw(ptr));
627                }
628            }
629        }
630
631        *retired = still_retired;
632    }
633}
634
635impl Drop for HazardDomain {
636    fn drop(&mut self) {
637        // Reclaim all retired nodes
638        let mut retired = self.retired.lock();
639        for ptr in retired.drain(..) {
640            unsafe {
641                drop(Box::from_raw(ptr));
642            }
643        }
644    }
645}
646
647// Thread ID helper (simple implementation)
648mod thread_id {
649    use std::sync::atomic::{AtomicUsize, Ordering};
650
651    static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
652
653    thread_local! {
654        static THREAD_ID: usize = NEXT_ID.fetch_add(1, Ordering::Relaxed);
655    }
656
657    pub fn get() -> usize {
658        THREAD_ID.with(|id| *id)
659    }
660}
661
662/// Lock-free memtable with hazard pointer protection
663pub struct LockFreeMemTable {
664    /// Concurrent hash map (lock-free for reads, fine-grained locking for writes)
665    data: DashMap<Vec<u8>, LockFreeVersionChain>,
666    /// Hazard pointer domain
667    hazard_domain: HazardDomain,
668    /// Approximate size in bytes
669    size_bytes: AtomicUsize,
670}
671
672impl LockFreeMemTable {
673    /// Create a new lock-free memtable
674    pub fn new() -> Self {
675        Self {
676            data: DashMap::new(),
677            hazard_domain: HazardDomain::new(MAX_THREADS),
678            size_bytes: AtomicUsize::new(0),
679        }
680    }
681
682    /// Read a value at snapshot timestamp
683    ///
684    /// This is a lock-free read protected by hazard pointers.
685    /// Returns a cloned value for safety across hazard pointer boundaries.
686    pub fn read(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
687        let chain = self.data.get(key)?;
688
689        // Read with hazard pointer protection
690        if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
691            // Protect the version
692            let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
693            self.hazard_domain.protect(ptr, 0);
694
695            // Get value using optimized inline storage
696            // Clone is still needed due to hazard pointer lifetime
697            let result = version.value_cloned();
698
699            // Clear hazard pointer
700            self.hazard_domain.clear(0);
701
702            result
703        } else {
704            None
705        }
706    }
707
708    /// Read a value at snapshot timestamp with zero-copy callback
709    ///
710    /// This is an optimized read path that avoids cloning for inline values.
711    /// The callback receives a reference to the value, avoiding allocation.
712    ///
713    /// # Arguments
714    /// * `key` - The key to read
715    /// * `snapshot_ts` - Snapshot timestamp for visibility
716    /// * `txn_id` - Current transaction ID (to see own uncommitted writes)
717    /// * `f` - Callback that receives the value reference
718    ///
719    /// # Returns
720    /// The result of the callback, or None if key not found
721    #[inline]
722    pub fn read_with<F, R>(
723        &self,
724        key: &[u8],
725        snapshot_ts: u64,
726        txn_id: Option<u64>,
727        f: F,
728    ) -> Option<R>
729    where
730        F: FnOnce(&[u8]) -> R,
731    {
732        let chain = self.data.get(key)?;
733
734        if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
735            // Protect the version
736            let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
737            self.hazard_domain.protect(ptr, 0);
738
739            // Call callback with value reference (zero-copy for inline)
740            let result = version.get_value().map(f);
741
742            // Clear hazard pointer
743            self.hazard_domain.clear(0);
744
745            result
746        } else {
747            None
748        }
749    }
750
751    /// Write a value (creates uncommitted version)
752    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
753        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
754
755        // Get or create version chain
756        let chain = self.data.entry(key.clone()).or_default();
757
758        // Add uncommitted version
759        chain.add_uncommitted(value, txn_id)?;
760
761        // Update size estimate
762        self.size_bytes
763            .fetch_add(key.len() + value_size + 64, Ordering::Relaxed);
764
765        Ok(())
766    }
767
768    /// Commit a transaction's writes
769    pub fn commit(&self, txn_id: u64, commit_ts: u64, keys: &[Vec<u8>]) {
770        for key in keys {
771            if let Some(chain) = self.data.get(key) {
772                chain.commit(txn_id, commit_ts);
773            }
774        }
775    }
776
777    /// Check for write conflict
778    pub fn has_write_conflict(&self, key: &[u8], txn_id: u64) -> bool {
779        if let Some(chain) = self.data.get(key) {
780            chain.has_write_conflict(txn_id)
781        } else {
782            false
783        }
784    }
785
786    /// Get approximate size in bytes
787    pub fn size_bytes(&self) -> usize {
788        self.size_bytes.load(Ordering::Relaxed)
789    }
790
791    /// Get number of keys
792    pub fn len(&self) -> usize {
793        self.data.len()
794    }
795
796    /// Check if empty
797    pub fn is_empty(&self) -> bool {
798        self.data.is_empty()
799    }
800}
801
802// Safety: LockFreeMemTable uses atomic operations and proper synchronization
803// for all shared data access. The raw pointers in HazardDomain are only
804// dereferenced under proper hazard pointer protection.
805unsafe impl Send for LockFreeMemTable {}
806unsafe impl Sync for LockFreeMemTable {}
807
808impl Default for LockFreeMemTable {
809    fn default() -> Self {
810        Self::new()
811    }
812}
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817    use std::sync::Arc;
818    use std::thread;
819
820    #[test]
821    fn test_basic_write_read() {
822        let memtable = LockFreeMemTable::new();
823
824        // Write
825        memtable
826            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
827            .unwrap();
828
829        // Read own uncommitted write
830        let val = memtable.read(b"key1", 100, Some(1));
831        assert_eq!(val, Some(b"value1".to_vec()));
832
833        // Cannot read uncommitted from other txn
834        let val = memtable.read(b"key1", 100, Some(2));
835        assert!(val.is_none());
836
837        // Commit and read
838        memtable.commit(1, 50, &[b"key1".to_vec()]);
839        let val = memtable.read(b"key1", 100, None);
840        assert_eq!(val, Some(b"value1".to_vec()));
841    }
842
843    #[test]
844    fn test_snapshot_isolation() {
845        let memtable = LockFreeMemTable::new();
846
847        // Write and commit at ts=10
848        memtable
849            .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
850            .unwrap();
851        memtable.commit(1, 10, &[b"key".to_vec()]);
852
853        // Write and commit at ts=20
854        memtable
855            .write(b"key".to_vec(), Some(b"v2".to_vec()), 2)
856            .unwrap();
857        memtable.commit(2, 20, &[b"key".to_vec()]);
858
859        // Snapshot at ts=15 sees v1
860        assert_eq!(memtable.read(b"key", 15, None), Some(b"v1".to_vec()));
861
862        // Snapshot at ts=25 sees v2
863        assert_eq!(memtable.read(b"key", 25, None), Some(b"v2".to_vec()));
864    }
865
866    #[test]
867    fn test_write_conflict() {
868        let memtable = LockFreeMemTable::new();
869
870        // First write
871        memtable
872            .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
873            .unwrap();
874
875        // Conflicting write should fail
876        let result = memtable.write(b"key".to_vec(), Some(b"v2".to_vec()), 2);
877        assert!(result.is_err());
878
879        // Same txn can write again
880        let result = memtable.write(b"key".to_vec(), Some(b"v1_updated".to_vec()), 1);
881        assert!(result.is_ok());
882    }
883
884    #[test]
885    fn test_concurrent_reads() {
886        let memtable = Arc::new(LockFreeMemTable::new());
887
888        // Setup data
889        for i in 0..100 {
890            let key = format!("key{}", i).into_bytes();
891            let val = format!("value{}", i).into_bytes();
892            memtable.write(key.clone(), Some(val), 1).unwrap();
893        }
894        memtable.commit(
895            1,
896            10,
897            &(0..100)
898                .map(|i| format!("key{}", i).into_bytes())
899                .collect::<Vec<_>>(),
900        );
901
902        // Concurrent reads
903        let handles: Vec<_> = (0..8)
904            .map(|t| {
905                let mt = Arc::clone(&memtable);
906                thread::spawn(move || {
907                    for i in 0..100 {
908                        let key = format!("key{}", i).into_bytes();
909                        let expected = format!("value{}", i).into_bytes();
910                        let val = mt.read(&key, 100, None);
911                        assert_eq!(val, Some(expected), "Thread {} failed at key{}", t, i);
912                    }
913                })
914            })
915            .collect();
916
917        for h in handles {
918            h.join().unwrap();
919        }
920    }
921
922    #[test]
923    fn test_inline_storage() {
924        // Test that small values are stored inline
925        let small_value = b"small".to_vec();
926        let version = LockFreeVersion::new(Some(small_value.clone()), 1);
927        assert!(version.is_inline(), "Small values should be inline");
928        assert_eq!(version.get_value(), Some(small_value.as_slice()));
929
930        // Test that large values are stored on heap
931        let large_value = vec![42u8; 100]; // > INLINE_VALUE_SIZE
932        let version = LockFreeVersion::new(Some(large_value.clone()), 2);
933        assert!(!version.is_inline(), "Large values should be on heap");
934        assert_eq!(version.get_value(), Some(large_value.as_slice()));
935
936        // Test tombstone
937        let version = LockFreeVersion::new(None, 3);
938        assert!(version.storage.is_tombstone());
939        assert_eq!(version.get_value(), None);
940    }
941
942    #[test]
943    fn test_inline_threshold() {
944        // Exactly at threshold should be inline
945        let value = vec![0u8; INLINE_VALUE_SIZE];
946        let version = LockFreeVersion::new(Some(value.clone()), 1);
947        assert!(version.is_inline(), "Values at threshold should be inline");
948
949        // One byte over threshold should be heap
950        let value = vec![0u8; INLINE_VALUE_SIZE + 1];
951        let version = LockFreeVersion::new(Some(value), 2);
952        assert!(
953            !version.is_inline(),
954            "Values over threshold should be on heap"
955        );
956    }
957
958    #[test]
959    fn test_read_with_callback() {
960        let memtable = LockFreeMemTable::new();
961
962        memtable
963            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
964            .unwrap();
965        memtable.commit(1, 10, &[b"key1".to_vec()]);
966
967        // Use read_with for zero-copy access
968        let len = memtable.read_with(b"key1", 100, None, |v| v.len());
969        assert_eq!(len, Some(6)); // "value1".len()
970
971        // Verify callback receives correct data
972        let matches = memtable.read_with(b"key1", 100, None, |v| v == b"value1");
973        assert_eq!(matches, Some(true));
974    }
975
976    #[test]
977    fn test_fat_node_overflow() {
978        // Write more than FAT_NODE_SLOTS versions to a single key
979        // to verify fat node chaining works correctly
980        let memtable = LockFreeMemTable::new();
981
982        for i in 0..12u64 {
983            // Each write from a different committed txn
984            memtable
985                .write(
986                    b"key".to_vec(),
987                    Some(format!("v{}", i).into_bytes()),
988                    i + 1,
989                )
990                .unwrap();
991            memtable.commit(i + 1, (i + 1) * 10, &[b"key".to_vec()]);
992        }
993
994        // Read at latest snapshot should return v11 (committed at ts=120)
995        let val = memtable.read(b"key", 200, None);
996        assert_eq!(val, Some(b"v11".to_vec()));
997
998        // Read at ts=55 should return v4 (committed at ts=50)
999        let val = memtable.read(b"key", 55, None);
1000        assert_eq!(val, Some(b"v4".to_vec()));
1001
1002        // Read at ts=5 should return None (v0 committed at ts=10)
1003        let val = memtable.read(b"key", 5, None);
1004        assert_eq!(val, None);
1005    }
1006
1007    #[test]
1008    fn test_fat_node_concurrent_writes() {
1009        use std::sync::Arc;
1010        use std::thread;
1011
1012        let memtable = Arc::new(LockFreeMemTable::new());
1013
1014        // 4 threads writing to different keys concurrently
1015        let mut handles = Vec::new();
1016        for t in 0..4u64 {
1017            let mt = Arc::clone(&memtable);
1018            handles.push(thread::spawn(move || {
1019                for i in 0..20u64 {
1020                    let key = format!("k{}-{}", t, i).into_bytes();
1021                    let val = format!("v{}-{}", t, i).into_bytes();
1022                    let txn_id = t * 1000 + i + 1;
1023                    mt.write(key.clone(), Some(val), txn_id).unwrap();
1024                    mt.commit(txn_id, txn_id * 10, &[key]);
1025                }
1026            }));
1027        }
1028
1029        for h in handles {
1030            h.join().unwrap();
1031        }
1032
1033        // Verify all 80 keys are readable
1034        for t in 0..4u64 {
1035            for i in 0..20u64 {
1036                let key = format!("k{}-{}", t, i).into_bytes();
1037                let val = memtable.read(&key, u64::MAX, None);
1038                assert_eq!(
1039                    val,
1040                    Some(format!("v{}-{}", t, i).into_bytes()),
1041                    "Missing key k{}-{}",
1042                    t,
1043                    i
1044                );
1045            }
1046        }
1047    }
1048}