Skip to main content

sochdb_storage/
lockfree_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Lock-Free MemTable with Hazard Pointer Protection
19//!
20//! This module provides a lock-free read path for the MVCC memtable using
21//! hazard pointers for safe memory reclamation.
22//!
23//! ## Problem Analysis
24//!
25//! Current implementation uses RwLock on entire HashMap:
26//! ```ignore
27//! pub struct MvccMemTable {
28//!     data: RwLock<HashMap<Vec<u8>, VersionChain>>,  // LOCK!
29//! }
30//! ```
31//!
32//! Problems:
33//! - `parking_lot::RwLock` read acquire: ~20-30ns uncontended
34//! - Under contention: ~100-500ns due to cache coherency
35//! - RwLock has reader-count atomic → contention point
36//!
37//! ## Solution
38//!
39//! True lock-free reads using hazard pointers:
40//! - O(1) uncontended reads (~15ns)
41//! - Linear scaling with reader count
42//! - No reader-reader interference
43//!
44//! ## Scalability Model (Amdahl's Law)
45//!
46//! For N threads with serial fraction s:
47//! Speedup = 1 / (s + (1-s)/N)
48//!
49//! RwLock: s ≈ 0.02 → For N=8: Speedup = 6.4×
50//! Lock-Free: s ≈ 0.001 → For N=8: Speedup = 7.9×
51//!
52//! **Improvement: 23% better scaling**
53
54use std::collections::HashSet;
55use std::ptr;
56use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicU64, AtomicUsize, Ordering};
57
58use dashmap::DashMap;
59use parking_lot::Mutex;
60
61use sochdb_core::{Result, SochDBError};
62
63/// Number of hazard pointers per thread
64const HP_PER_THREAD: usize = 2;
65
66/// Maximum number of threads supported
67const MAX_THREADS: usize = 128;
68
69/// Number of retired nodes before attempting reclamation
70const RECLAMATION_THRESHOLD: usize = 64;
71
72/// Number of version slots per fat node (Rec 2)
73///
74/// 8 slots × 8-byte pointer = 64 bytes — fits in one cache line.
75/// Reduces pointer chases from O(v) to O(v/8) for version chain traversal.
76const FAT_NODE_SLOTS: usize = 8;
77
78/// Maximum size for inline value storage (fits in cache line with metadata)
79///
80/// Cache line = 64 bytes
81/// Metadata: txn_id (8) + commit_ts (8) + next ptr (8) + storage discriminant (1) + len (1) = 26 bytes
82/// Inline data: 64 - 26 = 38 bytes (we use 56 for larger threshold since struct may span lines)
83pub const INLINE_VALUE_SIZE: usize = 56;
84
85/// Optimized value storage with inline allocation for small values
86///
87/// For typical database workloads, 80%+ of values are < 56 bytes.
88/// Storing these inline eliminates heap allocation and pointer chasing.
89///
90/// ## Cache Analysis
91///
92/// Current path: DashMap lookup → Version ptr → Value ptr (Vec data)
93/// Cache misses: 2-3 (worst case)
94///
95/// Inline path: DashMap lookup → Version with inline value
96/// Cache misses: 1
97///
98/// Expected speedup: 2-2.5× for reads on small values
99#[repr(C)]
100pub enum ValueStorage {
101    /// Value stored inline (most common case for small values)
102    Inline {
103        len: u8,
104        data: [u8; INLINE_VALUE_SIZE],
105    },
106    /// Value stored on heap (for large values > 56 bytes)
107    Heap(Box<[u8]>),
108    /// Tombstone marker (key was deleted)
109    Tombstone,
110}
111
112impl std::fmt::Debug for ValueStorage {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            ValueStorage::Inline { len, .. } => write!(f, "Inline(len={})", len),
116            ValueStorage::Heap(data) => write!(f, "Heap(len={})", data.len()),
117            ValueStorage::Tombstone => write!(f, "Tombstone"),
118        }
119    }
120}
121
122impl ValueStorage {
123    /// Create new value storage, preferring inline when possible
124    #[inline]
125    pub fn new(value: Option<&[u8]>) -> Self {
126        match value {
127            None => ValueStorage::Tombstone,
128            Some(v) if v.len() <= INLINE_VALUE_SIZE => {
129                let mut data = [0u8; INLINE_VALUE_SIZE];
130                data[..v.len()].copy_from_slice(v);
131                ValueStorage::Inline {
132                    len: v.len() as u8,
133                    data,
134                }
135            }
136            Some(v) => ValueStorage::Heap(v.to_vec().into_boxed_slice()),
137        }
138    }
139
140    /// Get value as byte slice
141    #[inline]
142    pub fn as_bytes(&self) -> Option<&[u8]> {
143        match self {
144            ValueStorage::Inline { len, data } => Some(&data[..*len as usize]),
145            ValueStorage::Heap(data) => Some(data),
146            ValueStorage::Tombstone => None,
147        }
148    }
149
150    /// Check if this is a tombstone
151    #[inline]
152    pub fn is_tombstone(&self) -> bool {
153        matches!(self, ValueStorage::Tombstone)
154    }
155
156    /// Check if value is stored inline
157    #[inline]
158    pub fn is_inline(&self) -> bool {
159        matches!(self, ValueStorage::Inline { .. })
160    }
161
162    /// Get the size of the stored value
163    #[inline]
164    pub fn len(&self) -> usize {
165        match self {
166            ValueStorage::Inline { len, .. } => *len as usize,
167            ValueStorage::Heap(data) => data.len(),
168            ValueStorage::Tombstone => 0,
169        }
170    }
171
172    /// Check if the stored value is empty
173    #[inline]
174    pub fn is_empty(&self) -> bool {
175        self.len() == 0
176    }
177}
178
179/// Version of a key-value pair for lock-free access
180///
181/// Uses inline storage for small values to eliminate heap allocation
182/// and improve cache locality. Most database values (80%+) fit inline.
183#[derive(Debug)]
184pub struct LockFreeVersion {
185    /// The value with optimized inline storage
186    pub storage: ValueStorage,
187    /// Transaction that created this version
188    pub txn_id: u64,
189    /// Commit timestamp (0 = uncommitted)
190    pub commit_ts: AtomicU64,
191    /// Next version in chain (older)
192    pub next: AtomicPtr<LockFreeVersion>,
193}
194
195impl LockFreeVersion {
196    /// Create a new uncommitted version with value slice (zero-copy for inline)
197    #[inline]
198    pub fn new_from_slice(value: Option<&[u8]>, txn_id: u64) -> Self {
199        Self {
200            storage: ValueStorage::new(value),
201            txn_id,
202            commit_ts: AtomicU64::new(0),
203            next: AtomicPtr::new(ptr::null_mut()),
204        }
205    }
206
207    /// Create a new uncommitted version (legacy API - accepts owned Vec)
208    pub fn new(value: Option<Vec<u8>>, txn_id: u64) -> Self {
209        Self::new_from_slice(value.as_deref(), txn_id)
210    }
211
212    /// Get the value as bytes (zero-copy for inline values)
213    #[inline]
214    pub fn get_value(&self) -> Option<&[u8]> {
215        self.storage.as_bytes()
216    }
217
218    /// Get the value as owned Vec (for compatibility)
219    ///
220    /// Note: Prefer `get_value()` to avoid allocation
221    #[inline]
222    pub fn value_cloned(&self) -> Option<Vec<u8>> {
223        self.storage.as_bytes().map(|v| v.to_vec())
224    }
225
226    /// Check if committed
227    #[inline]
228    pub fn is_committed(&self) -> bool {
229        self.commit_ts.load(Ordering::Acquire) > 0
230    }
231
232    /// Get commit timestamp
233    #[inline]
234    pub fn get_commit_ts(&self) -> u64 {
235        self.commit_ts.load(Ordering::Acquire)
236    }
237
238    /// Set commit timestamp
239    #[inline]
240    pub fn set_commit_ts(&self, ts: u64) {
241        self.commit_ts.store(ts, Ordering::Release);
242    }
243
244    /// Check if value is stored inline (for diagnostics)
245    #[inline]
246    pub fn is_inline(&self) -> bool {
247        self.storage.is_inline()
248    }
249}
250
251/// Fat-node for version chain (Rec 2: Lock-Free Fat-Node Version Chain)
252///
253/// Groups up to 8 version pointers per node, reducing pointer chases from
254/// O(v) to O(v/8). Slot pointers occupy 64 bytes = 1 cache line, so scanning
255/// all 8 slots costs a single cache-line fetch instead of 8 random chases.
256///
257/// Layout:
258/// - `count` (AtomicU8): number of occupied slots (0..FAT_NODE_SLOTS)
259/// - `slots`: array of AtomicPtr<LockFreeVersion>, newest at index `count-1`
260/// - `next`: pointer to older FatNode
261pub struct FatNode {
262    /// Number of valid version pointers in `slots` (0..=FAT_NODE_SLOTS)
263    count: AtomicU8,
264    /// Version pointers, newest at `count-1`, oldest at 0
265    slots: [AtomicPtr<LockFreeVersion>; FAT_NODE_SLOTS],
266    /// Next (older) fat node in the chain
267    next: AtomicPtr<FatNode>,
268}
269
270impl FatNode {
271    /// Create a new fat node with one initial version and a link to older node
272    fn new_with_first(version: *mut LockFreeVersion, older: *mut FatNode) -> Self {
273        let slots = std::array::from_fn(|i| {
274            if i == 0 {
275                AtomicPtr::new(version)
276            } else {
277                AtomicPtr::new(ptr::null_mut())
278            }
279        });
280        Self {
281            count: AtomicU8::new(1),
282            slots,
283            next: AtomicPtr::new(older),
284        }
285    }
286
287    /// Try to append a version to this fat node. Returns Ok(()) if successful,
288    /// Err(version_ptr) if the node is full.
289    ///
290    /// Thread-safety: CAS on `count` serializes slot reservation. The reserving
291    /// thread then publishes the pointer via Release store on the slot.
292    #[inline]
293    fn try_push(
294        &self,
295        version: *mut LockFreeVersion,
296    ) -> std::result::Result<(), *mut LockFreeVersion> {
297        loop {
298            let c = self.count.load(Ordering::Acquire);
299            if c as usize >= FAT_NODE_SLOTS {
300                return Err(version); // Full
301            }
302            // Reserve slot `c` by CAS count → c+1
303            match self
304                .count
305                .compare_exchange(c, c + 1, Ordering::AcqRel, Ordering::Acquire)
306            {
307                Ok(_) => {
308                    // We own slot `c` — publish the version pointer
309                    self.slots[c as usize].store(version, Ordering::Release);
310                    return Ok(());
311                }
312                Err(_) => continue, // Another thread won; retry
313            }
314        }
315    }
316
317    /// Get version pointer at slot index (must be < count)
318    #[inline]
319    fn slot(&self, idx: u8) -> *mut LockFreeVersion {
320        self.slots[idx as usize].load(Ordering::Acquire)
321    }
322
323    /// Iterate versions newest-first (index count-1 down to 0)
324    #[inline]
325    fn iter_newest_first(&self) -> impl Iterator<Item = &LockFreeVersion> {
326        let count = self.count.load(Ordering::Acquire);
327        (0..count).rev().filter_map(move |i| {
328            let ptr = self.slots[i as usize].load(Ordering::Acquire);
329            if ptr.is_null() {
330                None
331            } else {
332                Some(unsafe { &*ptr })
333            }
334        })
335    }
336}
337
338/// Lock-free version chain using fat-node grouping (Rec 2)
339///
340/// Instead of a singly-linked list of individual versions, versions are grouped
341/// into fat nodes of 8. This reduces pointer chases from O(v) to O(v/8) since
342/// scanning 8 slots within a fat node hits the same cache line.
343pub struct LockFreeVersionChain {
344    /// Head fat node (contains the most recent versions)
345    head: AtomicPtr<FatNode>,
346}
347
348impl Default for LockFreeVersionChain {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354impl LockFreeVersionChain {
355    /// Create empty version chain
356    pub fn new() -> Self {
357        Self {
358            head: AtomicPtr::new(ptr::null_mut()),
359        }
360    }
361
362    /// Add a new uncommitted version
363    ///
364    /// Returns error if there's already an uncommitted version from another txn
365    pub fn add_uncommitted(&self, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
366        let new_version = Box::into_raw(Box::new(LockFreeVersion::new(value, txn_id)));
367
368        loop {
369            let head = self.head.load(Ordering::Acquire);
370
371            // Check for write-write conflict: inspect the newest version
372            if !head.is_null() {
373                let fat = unsafe { &*head };
374                let count = fat.count.load(Ordering::Acquire);
375                if count > 0 {
376                    let newest = fat.slot(count - 1);
377                    if !newest.is_null() {
378                        let newest_ref = unsafe { &*newest };
379                        if !newest_ref.is_committed() && newest_ref.txn_id != txn_id {
380                            unsafe {
381                                drop(Box::from_raw(new_version));
382                            }
383                            return Err(SochDBError::Internal("Write-write conflict".into()));
384                        }
385                    }
386                }
387
388                // Try to push into existing fat node
389                match fat.try_push(new_version) {
390                    Ok(()) => return Ok(()),
391                    Err(_) => {
392                        // Fat node is full — allocate new one linking to current head
393                        let new_fat =
394                            Box::into_raw(Box::new(FatNode::new_with_first(new_version, head)));
395                        match self.head.compare_exchange(
396                            head,
397                            new_fat,
398                            Ordering::AcqRel,
399                            Ordering::Acquire,
400                        ) {
401                            Ok(_) => return Ok(()),
402                            Err(_) => {
403                                // CAS failed — reclaim the fat node, keep the version for retry
404                                unsafe {
405                                    // Detach version from fat node before dropping it
406                                    (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
407                                    (*new_fat).count.store(0, Ordering::Relaxed);
408                                    drop(Box::from_raw(new_fat));
409                                }
410                                continue; // Retry from the top
411                            }
412                        }
413                    }
414                }
415            } else {
416                // No head — allocate first fat node
417                let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(
418                    new_version,
419                    ptr::null_mut(),
420                )));
421                match self
422                    .head
423                    .compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire)
424                {
425                    Ok(_) => return Ok(()),
426                    Err(_) => {
427                        unsafe {
428                            (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
429                            (*new_fat).count.store(0, Ordering::Relaxed);
430                            drop(Box::from_raw(new_fat));
431                        }
432                        continue;
433                    }
434                }
435            }
436        }
437    }
438
439    /// Commit a version
440    pub fn commit(&self, txn_id: u64, commit_ts: u64) -> bool {
441        let mut fat_ptr = self.head.load(Ordering::Acquire);
442
443        while !fat_ptr.is_null() {
444            let fat = unsafe { &*fat_ptr };
445            // Scan this fat node's slots (newest first)
446            for ver in fat.iter_newest_first() {
447                if ver.txn_id == txn_id && !ver.is_committed() {
448                    ver.set_commit_ts(commit_ts);
449                    return true;
450                }
451            }
452            fat_ptr = fat.next.load(Ordering::Acquire);
453        }
454
455        false
456    }
457
458    /// Read at a snapshot timestamp
459    ///
460    /// Returns the most recent committed version visible at snapshot_ts,
461    /// or an uncommitted version if it belongs to current_txn_id.
462    pub fn read_at(
463        &self,
464        snapshot_ts: u64,
465        current_txn_id: Option<u64>,
466    ) -> Option<&LockFreeVersion> {
467        let mut fat_ptr = self.head.load(Ordering::Acquire);
468
469        while !fat_ptr.is_null() {
470            let fat = unsafe { &*fat_ptr };
471            // Scan this fat node's slots (newest first)
472            for version in fat.iter_newest_first() {
473                // Check if this is our own uncommitted write
474                if let Some(txn_id) = current_txn_id
475                    && version.txn_id == txn_id
476                    && !version.is_committed()
477                {
478                    return Some(version);
479                }
480
481                // Check if this version is visible
482                let commit_ts = version.get_commit_ts();
483                if commit_ts > 0 && commit_ts < snapshot_ts {
484                    return Some(version);
485                }
486            }
487            fat_ptr = fat.next.load(Ordering::Acquire);
488        }
489
490        None
491    }
492
493    /// Check if there's an uncommitted version by another transaction
494    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
495        let head = self.head.load(Ordering::Acquire);
496
497        if !head.is_null() {
498            let fat = unsafe { &*head };
499            let count = fat.count.load(Ordering::Acquire);
500            if count > 0 {
501                let newest = fat.slot(count - 1);
502                if !newest.is_null() {
503                    let version = unsafe { &*newest };
504                    return !version.is_committed() && version.txn_id != my_txn_id;
505                }
506            }
507        }
508
509        false
510    }
511}
512
513/// Thread-local hazard pointer record
514///
515/// Cache-line aligned to prevent false sharing
516#[repr(C, align(64))]
517struct HazardRecord {
518    /// Protected pointers
519    hazard: [AtomicPtr<LockFreeVersion>; HP_PER_THREAD],
520    /// Active flag (non-zero if thread is using this record)
521    active: AtomicU64,
522}
523
524impl HazardRecord {
525    const fn new() -> Self {
526        Self {
527            hazard: [
528                AtomicPtr::new(ptr::null_mut()),
529                AtomicPtr::new(ptr::null_mut()),
530            ],
531            active: AtomicU64::new(0),
532        }
533    }
534
535    /// Acquire this record for a thread
536    fn try_acquire(&self, thread_id: u64) -> bool {
537        self.active
538            .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Acquire)
539            .is_ok()
540    }
541
542    /// Release this record
543    #[allow(dead_code)]
544    fn release(&self) {
545        // Clear hazard pointers first
546        for hp in &self.hazard {
547            hp.store(ptr::null_mut(), Ordering::Release);
548        }
549        self.active.store(0, Ordering::Release);
550    }
551}
552
553/// Hazard pointer domain for safe memory reclamation
554pub struct HazardDomain {
555    /// Hazard records (one per potential thread)
556    records: Vec<HazardRecord>,
557    /// Retired nodes pending reclamation
558    retired: Mutex<Vec<*mut LockFreeVersion>>,
559}
560
561impl HazardDomain {
562    /// Create a new hazard domain
563    pub fn new(max_threads: usize) -> Self {
564        let mut records = Vec::with_capacity(max_threads);
565        for _ in 0..max_threads {
566            records.push(HazardRecord::new());
567        }
568
569        Self {
570            records,
571            retired: Mutex::new(Vec::with_capacity(RECLAMATION_THRESHOLD * 2)),
572        }
573    }
574
575    /// Get a hazard record for the current thread
576    fn get_record(&self) -> Option<&HazardRecord> {
577        let thread_id = thread_id::get() as u64;
578
579        // First try to find already owned record
580        for record in &self.records {
581            if record.active.load(Ordering::Acquire) == thread_id {
582                return Some(record);
583            }
584        }
585
586        // Try to acquire a new record
587        self.records
588            .iter()
589            .find(|record| record.try_acquire(thread_id))
590    }
591
592    /// Protect a pointer with hazard pointer
593    #[inline]
594    pub fn protect(&self, ptr: *mut LockFreeVersion, slot: usize) -> bool {
595        if let Some(record) = self.get_record()
596            && slot < HP_PER_THREAD
597        {
598            record.hazard[slot].store(ptr, Ordering::Release);
599            std::sync::atomic::fence(Ordering::SeqCst);
600            return true;
601        }
602        false
603    }
604
605    /// Clear a hazard pointer slot
606    #[inline]
607    pub fn clear(&self, slot: usize) {
608        if let Some(record) = self.get_record()
609            && slot < HP_PER_THREAD
610        {
611            record.hazard[slot].store(ptr::null_mut(), Ordering::Release);
612        }
613    }
614
615    /// Retire a pointer for later reclamation
616    pub fn retire(&self, ptr: *mut LockFreeVersion) {
617        let mut retired = self.retired.lock();
618        retired.push(ptr);
619
620        // Attempt reclamation if threshold reached
621        if retired.len() >= RECLAMATION_THRESHOLD {
622            self.try_reclaim(&mut retired);
623        }
624    }
625
626    /// Try to reclaim retired pointers not protected by any hazard pointer
627    fn try_reclaim(&self, retired: &mut Vec<*mut LockFreeVersion>) {
628        // Collect all active hazard pointers
629        let mut protected: HashSet<usize> = HashSet::new();
630
631        for record in &self.records {
632            if record.active.load(Ordering::Acquire) != 0 {
633                for hp in &record.hazard {
634                    let ptr = hp.load(Ordering::Acquire);
635                    if !ptr.is_null() {
636                        protected.insert(ptr as usize);
637                    }
638                }
639            }
640        }
641
642        // Reclaim unprotected nodes
643        let mut still_retired = Vec::new();
644        for ptr in retired.drain(..) {
645            if protected.contains(&(ptr as usize)) {
646                still_retired.push(ptr);
647            } else {
648                // Safe to reclaim
649                unsafe {
650                    drop(Box::from_raw(ptr));
651                }
652            }
653        }
654
655        *retired = still_retired;
656    }
657}
658
659impl Drop for HazardDomain {
660    fn drop(&mut self) {
661        // Reclaim all retired nodes
662        let mut retired = self.retired.lock();
663        for ptr in retired.drain(..) {
664            unsafe {
665                drop(Box::from_raw(ptr));
666            }
667        }
668    }
669}
670
671// Thread ID helper (simple implementation)
672mod thread_id {
673    use std::sync::atomic::{AtomicUsize, Ordering};
674
675    static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
676
677    thread_local! {
678        static THREAD_ID: usize = NEXT_ID.fetch_add(1, Ordering::Relaxed);
679    }
680
681    pub fn get() -> usize {
682        THREAD_ID.with(|id| *id)
683    }
684}
685
686/// Lock-free memtable with hazard pointer protection
687pub struct LockFreeMemTable {
688    /// Concurrent hash map (lock-free for reads, fine-grained locking for writes)
689    data: DashMap<Vec<u8>, LockFreeVersionChain>,
690    /// Hazard pointer domain
691    hazard_domain: HazardDomain,
692    /// Approximate size in bytes
693    size_bytes: AtomicUsize,
694}
695
696impl LockFreeMemTable {
697    /// Create a new lock-free memtable
698    pub fn new() -> Self {
699        Self {
700            data: DashMap::new(),
701            hazard_domain: HazardDomain::new(MAX_THREADS),
702            size_bytes: AtomicUsize::new(0),
703        }
704    }
705
706    /// Read a value at snapshot timestamp
707    ///
708    /// This is a lock-free read protected by hazard pointers.
709    /// Returns a cloned value for safety across hazard pointer boundaries.
710    pub fn read(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
711        let chain = self.data.get(key)?;
712
713        // Read with hazard pointer protection
714        if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
715            // Protect the version
716            let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
717            self.hazard_domain.protect(ptr, 0);
718
719            // Get value using optimized inline storage
720            // Clone is still needed due to hazard pointer lifetime
721            let result = version.value_cloned();
722
723            // Clear hazard pointer
724            self.hazard_domain.clear(0);
725
726            result
727        } else {
728            None
729        }
730    }
731
732    /// Read a value at snapshot timestamp with zero-copy callback
733    ///
734    /// This is an optimized read path that avoids cloning for inline values.
735    /// The callback receives a reference to the value, avoiding allocation.
736    ///
737    /// # Arguments
738    /// * `key` - The key to read
739    /// * `snapshot_ts` - Snapshot timestamp for visibility
740    /// * `txn_id` - Current transaction ID (to see own uncommitted writes)
741    /// * `f` - Callback that receives the value reference
742    ///
743    /// # Returns
744    /// The result of the callback, or None if key not found
745    #[inline]
746    pub fn read_with<F, R>(
747        &self,
748        key: &[u8],
749        snapshot_ts: u64,
750        txn_id: Option<u64>,
751        f: F,
752    ) -> Option<R>
753    where
754        F: FnOnce(&[u8]) -> R,
755    {
756        let chain = self.data.get(key)?;
757
758        if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
759            // Protect the version
760            let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
761            self.hazard_domain.protect(ptr, 0);
762
763            // Call callback with value reference (zero-copy for inline)
764            let result = version.get_value().map(f);
765
766            // Clear hazard pointer
767            self.hazard_domain.clear(0);
768
769            result
770        } else {
771            None
772        }
773    }
774
775    /// Write a value (creates uncommitted version)
776    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
777        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
778
779        // Get or create version chain
780        let chain = self.data.entry(key.clone()).or_default();
781
782        // Add uncommitted version
783        chain.add_uncommitted(value, txn_id)?;
784
785        // Update size estimate
786        self.size_bytes
787            .fetch_add(key.len() + value_size + 64, Ordering::Relaxed);
788
789        Ok(())
790    }
791
792    /// Commit a transaction's writes
793    pub fn commit(&self, txn_id: u64, commit_ts: u64, keys: &[Vec<u8>]) {
794        for key in keys {
795            if let Some(chain) = self.data.get(key) {
796                chain.commit(txn_id, commit_ts);
797            }
798        }
799    }
800
801    /// Check for write conflict
802    pub fn has_write_conflict(&self, key: &[u8], txn_id: u64) -> bool {
803        if let Some(chain) = self.data.get(key) {
804            chain.has_write_conflict(txn_id)
805        } else {
806            false
807        }
808    }
809
810    /// Get approximate size in bytes
811    pub fn size_bytes(&self) -> usize {
812        self.size_bytes.load(Ordering::Relaxed)
813    }
814
815    /// Get number of keys
816    pub fn len(&self) -> usize {
817        self.data.len()
818    }
819
820    /// Check if empty
821    pub fn is_empty(&self) -> bool {
822        self.data.is_empty()
823    }
824}
825
826// Safety: LockFreeMemTable uses atomic operations and proper synchronization
827// for all shared data access. The raw pointers in HazardDomain are only
828// dereferenced under proper hazard pointer protection.
829unsafe impl Send for LockFreeMemTable {}
830unsafe impl Sync for LockFreeMemTable {}
831
832impl Default for LockFreeMemTable {
833    fn default() -> Self {
834        Self::new()
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use std::sync::Arc;
842    use std::thread;
843
844    #[test]
845    fn test_basic_write_read() {
846        let memtable = LockFreeMemTable::new();
847
848        // Write
849        memtable
850            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
851            .unwrap();
852
853        // Read own uncommitted write
854        let val = memtable.read(b"key1", 100, Some(1));
855        assert_eq!(val, Some(b"value1".to_vec()));
856
857        // Cannot read uncommitted from other txn
858        let val = memtable.read(b"key1", 100, Some(2));
859        assert!(val.is_none());
860
861        // Commit and read
862        memtable.commit(1, 50, &[b"key1".to_vec()]);
863        let val = memtable.read(b"key1", 100, None);
864        assert_eq!(val, Some(b"value1".to_vec()));
865    }
866
867    #[test]
868    fn test_snapshot_isolation() {
869        let memtable = LockFreeMemTable::new();
870
871        // Write and commit at ts=10
872        memtable
873            .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
874            .unwrap();
875        memtable.commit(1, 10, &[b"key".to_vec()]);
876
877        // Write and commit at ts=20
878        memtable
879            .write(b"key".to_vec(), Some(b"v2".to_vec()), 2)
880            .unwrap();
881        memtable.commit(2, 20, &[b"key".to_vec()]);
882
883        // Snapshot at ts=15 sees v1
884        assert_eq!(memtable.read(b"key", 15, None), Some(b"v1".to_vec()));
885
886        // Snapshot at ts=25 sees v2
887        assert_eq!(memtable.read(b"key", 25, None), Some(b"v2".to_vec()));
888    }
889
890    #[test]
891    fn test_write_conflict() {
892        let memtable = LockFreeMemTable::new();
893
894        // First write
895        memtable
896            .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
897            .unwrap();
898
899        // Conflicting write should fail
900        let result = memtable.write(b"key".to_vec(), Some(b"v2".to_vec()), 2);
901        assert!(result.is_err());
902
903        // Same txn can write again
904        let result = memtable.write(b"key".to_vec(), Some(b"v1_updated".to_vec()), 1);
905        assert!(result.is_ok());
906    }
907
908    #[test]
909    fn test_concurrent_reads() {
910        let memtable = Arc::new(LockFreeMemTable::new());
911
912        // Setup data
913        for i in 0..100 {
914            let key = format!("key{}", i).into_bytes();
915            let val = format!("value{}", i).into_bytes();
916            memtable.write(key.clone(), Some(val), 1).unwrap();
917        }
918        memtable.commit(
919            1,
920            10,
921            &(0..100)
922                .map(|i| format!("key{}", i).into_bytes())
923                .collect::<Vec<_>>(),
924        );
925
926        // Concurrent reads
927        let handles: Vec<_> = (0..8)
928            .map(|t| {
929                let mt = Arc::clone(&memtable);
930                thread::spawn(move || {
931                    for i in 0..100 {
932                        let key = format!("key{}", i).into_bytes();
933                        let expected = format!("value{}", i).into_bytes();
934                        let val = mt.read(&key, 100, None);
935                        assert_eq!(val, Some(expected), "Thread {} failed at key{}", t, i);
936                    }
937                })
938            })
939            .collect();
940
941        for h in handles {
942            h.join().unwrap();
943        }
944    }
945
946    #[test]
947    fn test_inline_storage() {
948        // Test that small values are stored inline
949        let small_value = b"small".to_vec();
950        let version = LockFreeVersion::new(Some(small_value.clone()), 1);
951        assert!(version.is_inline(), "Small values should be inline");
952        assert_eq!(version.get_value(), Some(small_value.as_slice()));
953
954        // Test that large values are stored on heap
955        let large_value = vec![42u8; 100]; // > INLINE_VALUE_SIZE
956        let version = LockFreeVersion::new(Some(large_value.clone()), 2);
957        assert!(!version.is_inline(), "Large values should be on heap");
958        assert_eq!(version.get_value(), Some(large_value.as_slice()));
959
960        // Test tombstone
961        let version = LockFreeVersion::new(None, 3);
962        assert!(version.storage.is_tombstone());
963        assert_eq!(version.get_value(), None);
964    }
965
966    #[test]
967    fn test_inline_threshold() {
968        // Exactly at threshold should be inline
969        let value = vec![0u8; INLINE_VALUE_SIZE];
970        let version = LockFreeVersion::new(Some(value.clone()), 1);
971        assert!(version.is_inline(), "Values at threshold should be inline");
972
973        // One byte over threshold should be heap
974        let value = vec![0u8; INLINE_VALUE_SIZE + 1];
975        let version = LockFreeVersion::new(Some(value), 2);
976        assert!(
977            !version.is_inline(),
978            "Values over threshold should be on heap"
979        );
980    }
981
982    #[test]
983    fn test_read_with_callback() {
984        let memtable = LockFreeMemTable::new();
985
986        memtable
987            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
988            .unwrap();
989        memtable.commit(1, 10, &[b"key1".to_vec()]);
990
991        // Use read_with for zero-copy access
992        let len = memtable.read_with(b"key1", 100, None, |v| v.len());
993        assert_eq!(len, Some(6)); // "value1".len()
994
995        // Verify callback receives correct data
996        let matches = memtable.read_with(b"key1", 100, None, |v| v == b"value1");
997        assert_eq!(matches, Some(true));
998    }
999
1000    #[test]
1001    fn test_fat_node_overflow() {
1002        // Write more than FAT_NODE_SLOTS versions to a single key
1003        // to verify fat node chaining works correctly
1004        let memtable = LockFreeMemTable::new();
1005
1006        for i in 0..12u64 {
1007            // Each write from a different committed txn
1008            memtable
1009                .write(b"key".to_vec(), Some(format!("v{}", i).into_bytes()), i + 1)
1010                .unwrap();
1011            memtable.commit(i + 1, (i + 1) * 10, &[b"key".to_vec()]);
1012        }
1013
1014        // Read at latest snapshot should return v11 (committed at ts=120)
1015        let val = memtable.read(b"key", 200, None);
1016        assert_eq!(val, Some(b"v11".to_vec()));
1017
1018        // Read at ts=55 should return v4 (committed at ts=50)
1019        let val = memtable.read(b"key", 55, None);
1020        assert_eq!(val, Some(b"v4".to_vec()));
1021
1022        // Read at ts=5 should return None (v0 committed at ts=10)
1023        let val = memtable.read(b"key", 5, None);
1024        assert_eq!(val, None);
1025    }
1026
1027    #[test]
1028    fn test_fat_node_concurrent_writes() {
1029        use std::sync::Arc;
1030        use std::thread;
1031
1032        let memtable = Arc::new(LockFreeMemTable::new());
1033
1034        // 4 threads writing to different keys concurrently
1035        let mut handles = Vec::new();
1036        for t in 0..4u64 {
1037            let mt = Arc::clone(&memtable);
1038            handles.push(thread::spawn(move || {
1039                for i in 0..20u64 {
1040                    let key = format!("k{}-{}", t, i).into_bytes();
1041                    let val = format!("v{}-{}", t, i).into_bytes();
1042                    let txn_id = t * 1000 + i + 1;
1043                    mt.write(key.clone(), Some(val), txn_id).unwrap();
1044                    mt.commit(txn_id, txn_id * 10, &[key]);
1045                }
1046            }));
1047        }
1048
1049        for h in handles {
1050            h.join().unwrap();
1051        }
1052
1053        // Verify all 80 keys are readable
1054        for t in 0..4u64 {
1055            for i in 0..20u64 {
1056                let key = format!("k{}-{}", t, i).into_bytes();
1057                let val = memtable.read(&key, u64::MAX, None);
1058                assert_eq!(
1059                    val,
1060                    Some(format!("v{}-{}", t, i).into_bytes()),
1061                    "Missing key k{}-{}",
1062                    t,
1063                    i
1064                );
1065            }
1066        }
1067    }
1068}