1use dashmap::DashMap;
44use parking_lot::Mutex;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
48
49pub type TableId = u64;
51
52pub type RowId = u128;
54
55pub type TxnId = u64;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum IntentLock {
61 IntentShared,
63 IntentExclusive,
65 Shared,
67 Exclusive,
69}
70
71impl IntentLock {
72 pub fn is_compatible(&self, other: &IntentLock) -> bool {
74 use IntentLock::*;
75 matches!(
76 (self, other),
77 (IntentShared, IntentShared)
78 | (IntentShared, IntentExclusive)
79 | (IntentShared, Shared)
80 | (IntentExclusive, IntentShared)
81 | (IntentExclusive, IntentExclusive)
82 | (Shared, IntentShared)
83 | (Shared, Shared)
84 )
85 }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum LockMode {
91 Shared,
93 Exclusive,
95}
96
97impl LockMode {
98 pub fn is_compatible(&self, other: &LockMode) -> bool {
99 matches!((self, other), (LockMode::Shared, LockMode::Shared))
100 }
101}
102
103#[derive(Debug)]
105struct TableLockEntry {
106 mode: IntentLock,
107 holders: Vec<TxnId>,
108 }
113
114impl TableLockEntry {
115 fn new(mode: IntentLock, txn_id: TxnId) -> Self {
116 Self {
117 mode,
118 holders: vec![txn_id],
119 }
120 }
121}
122
123#[derive(Debug)]
125struct RowLockEntry {
126 mode: LockMode,
127 holders: Vec<TxnId>,
128}
129
130impl RowLockEntry {
131 fn new(mode: LockMode, txn_id: TxnId) -> Self {
132 Self {
133 mode,
134 holders: vec![txn_id],
135 }
136 }
137}
138
139pub struct ShardedLockTable {
142 shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
143 stats: LockTableStats,
144}
145
146impl Default for ShardedLockTable {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl ShardedLockTable {
153 pub fn new() -> Self {
155 Self {
156 shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
157 stats: LockTableStats::default(),
158 }
159 }
160
161 #[inline]
163 fn shard_index(&self, row_id: RowId) -> usize {
164 ((row_id >> 64) as usize ^ (row_id as usize)) % 256
166 }
167
168 pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
170 let shard_idx = self.shard_index(row_id);
171 let mut shard = self.shards[shard_idx].lock();
172
173 if let Some(entry) = shard.get_mut(&row_id) {
174 if entry.holders.contains(&txn_id) {
176 if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
178 if entry.holders.len() == 1 {
179 entry.mode = LockMode::Exclusive;
180 self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
181 return LockResult::Acquired;
182 } else {
183 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
184 return LockResult::WouldBlock;
185 }
186 }
187 return LockResult::AlreadyHeld;
188 }
189
190 if entry.mode.is_compatible(&mode) {
192 entry.holders.push(txn_id);
193 self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
194 return LockResult::Acquired;
195 }
196
197 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
198 return LockResult::WouldBlock;
199 }
200
201 shard.insert(row_id, RowLockEntry::new(mode, txn_id));
203 match mode {
204 LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
205 LockMode::Exclusive => self
206 .stats
207 .exclusive_acquired
208 .fetch_add(1, Ordering::Relaxed),
209 };
210 LockResult::Acquired
211 }
212
213 pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
215 let shard_idx = self.shard_index(row_id);
216 let mut shard = self.shards[shard_idx].lock();
217
218 if let Some(entry) = shard.get_mut(&row_id)
219 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
220 {
221 entry.holders.remove(pos);
222 self.stats.released.fetch_add(1, Ordering::Relaxed);
223
224 if entry.holders.is_empty() {
225 shard.remove(&row_id);
226 }
227 return true;
228 }
229 false
230 }
231
232 pub fn unlock_all(&self, txn_id: TxnId) -> usize {
241 let mut count = 0;
242 for shard in &self.shards {
243 let mut shard_guard = shard.lock();
244 let to_remove: Vec<RowId> = shard_guard
245 .iter()
246 .filter(|(_, entry)| entry.holders.contains(&txn_id))
247 .map(|(&row_id, _)| row_id)
248 .collect();
249
250 for row_id in to_remove {
251 if let Some(entry) = shard_guard.get_mut(&row_id)
252 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
253 {
254 entry.holders.remove(pos);
255 count += 1;
256
257 if entry.holders.is_empty() {
258 shard_guard.remove(&row_id);
259 }
260 }
261 }
262 }
263 self.stats
264 .released
265 .fetch_add(count as u64, Ordering::Relaxed);
266 count
267 }
268
269 pub fn try_lock_tracked(
274 &self,
275 row_id: RowId,
276 mode: LockMode,
277 txn_id: TxnId,
278 lock_set: &mut TransactionLockSet,
279 ) -> LockResult {
280 let result = self.try_lock(row_id, mode, txn_id);
281 if matches!(result, LockResult::Acquired) {
282 let shard_idx = self.shard_index(row_id);
283 lock_set.record(shard_idx, row_id);
284 }
285 result
286 }
287
288 pub fn unlock_all_tracked(&self, txn_id: TxnId, lock_set: &TransactionLockSet) -> usize {
293 let mut count = 0;
294 for &(shard_idx, row_id) in &lock_set.locks {
295 let mut shard = self.shards[shard_idx].lock();
296 if let Some(entry) = shard.get_mut(&row_id)
297 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
298 {
299 entry.holders.remove(pos);
300 count += 1;
301 if entry.holders.is_empty() {
302 shard.remove(&row_id);
303 }
304 }
305 }
306 self.stats
307 .released
308 .fetch_add(count as u64, Ordering::Relaxed);
309 count
310 }
311
312 pub fn stats(&self) -> &LockTableStats {
314 &self.stats
315 }
316}
317
318#[derive(Debug, Default)]
327pub struct TransactionLockSet {
328 locks: Vec<(usize, RowId)>,
330}
331
332impl TransactionLockSet {
333 pub fn new() -> Self {
335 Self {
336 locks: Vec::with_capacity(8),
337 }
338 }
339
340 fn record(&mut self, shard_idx: usize, row_id: RowId) {
342 self.locks.push((shard_idx, row_id));
343 }
344
345 pub fn len(&self) -> usize {
347 self.locks.len()
348 }
349
350 pub fn is_empty(&self) -> bool {
352 self.locks.is_empty()
353 }
354
355 pub fn clear(&mut self) {
357 self.locks.clear();
358 }
359}
360
361#[derive(Debug, Default)]
363pub struct LockTableStats {
364 pub shared_acquired: AtomicU64,
365 pub exclusive_acquired: AtomicU64,
366 pub upgrades: AtomicU64,
367 pub conflicts: AtomicU64,
368 pub released: AtomicU64,
369}
370
371#[derive(Debug, Clone, Copy, PartialEq, Eq)]
373pub enum LockResult {
374 Acquired,
376 AlreadyHeld,
378 WouldBlock,
380 Conflict,
384}
385
386pub struct LockManager {
388 table_locks: DashMap<TableId, TableLockEntry>,
390 row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
392 epoch: AtomicU64,
394 stats: LockManagerStats,
396}
397
398impl Default for LockManager {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404impl LockManager {
405 pub fn new() -> Self {
407 Self {
408 table_locks: DashMap::new(),
409 row_locks: DashMap::new(),
410 epoch: AtomicU64::new(0),
411 stats: LockManagerStats::default(),
412 }
413 }
414
415 pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
417 use dashmap::mapref::entry::Entry;
418
419 match self.table_locks.entry(table_id) {
420 Entry::Vacant(vacant) => {
421 vacant.insert(TableLockEntry::new(mode, txn_id));
422 self.stats
423 .table_locks_acquired
424 .fetch_add(1, Ordering::Relaxed);
425 LockResult::Acquired
426 }
427 Entry::Occupied(mut occupied) => {
428 let entry = occupied.get_mut();
429
430 if entry.holders.contains(&txn_id) {
432 return LockResult::AlreadyHeld;
433 }
434
435 if entry.mode.is_compatible(&mode) {
437 entry.holders.push(txn_id);
438 self.stats
439 .table_locks_acquired
440 .fetch_add(1, Ordering::Relaxed);
441 return LockResult::Acquired;
442 }
443
444 self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
445 LockResult::WouldBlock
446 }
447 }
448 }
449
450 pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
452 if let Some(mut entry) = self.table_locks.get_mut(&table_id)
453 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
454 {
455 entry.holders.remove(pos);
456 self.stats
457 .table_locks_released
458 .fetch_add(1, Ordering::Relaxed);
459
460 if entry.holders.is_empty() {
461 drop(entry);
462 self.table_locks.remove(&table_id);
463 }
464 return true;
465 }
466 false
467 }
468
469 fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
471 self.row_locks
472 .entry(table_id)
473 .or_insert_with(|| Arc::new(ShardedLockTable::new()))
474 .clone()
475 }
476
477 pub fn lock_row(
479 &self,
480 table_id: TableId,
481 row_id: RowId,
482 mode: LockMode,
483 txn_id: TxnId,
484 ) -> LockResult {
485 let intent_mode = match mode {
487 LockMode::Shared => IntentLock::IntentShared,
488 LockMode::Exclusive => IntentLock::IntentExclusive,
489 };
490
491 match self.lock_table(table_id, intent_mode, txn_id) {
492 LockResult::Acquired | LockResult::AlreadyHeld => {}
493 result => return result,
494 }
495
496 let row_locks = self.get_row_lock_table(table_id);
498 row_locks.try_lock(row_id, mode, txn_id)
499 }
500
501 pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
503 if let Some(row_locks) = self.row_locks.get(&table_id) {
504 return row_locks.unlock(row_id, txn_id);
505 }
506 false
507 }
508
509 pub fn release_all(&self, txn_id: TxnId) -> usize {
511 let mut count = 0;
512
513 for entry in self.row_locks.iter() {
515 count += entry.value().unlock_all(txn_id);
516 }
517
518 let table_ids: Vec<TableId> = self
520 .table_locks
521 .iter()
522 .filter(|e| e.value().holders.contains(&txn_id))
523 .map(|e| *e.key())
524 .collect();
525
526 for table_id in table_ids {
527 if self.unlock_table(table_id, txn_id) {
528 count += 1;
529 }
530 }
531
532 count
533 }
534
535 pub fn enter_epoch(&self) -> u64 {
537 self.epoch.fetch_add(1, Ordering::AcqRel)
538 }
539
540 pub fn current_epoch(&self) -> u64 {
542 self.epoch.load(Ordering::Acquire)
543 }
544
545 pub fn stats(&self) -> &LockManagerStats {
547 &self.stats
548 }
549}
550
551#[derive(Debug, Default)]
553pub struct LockManagerStats {
554 pub table_locks_acquired: AtomicU64,
555 pub table_locks_released: AtomicU64,
556 pub table_conflicts: AtomicU64,
557}
558
559pub struct OptimisticVersion {
562 version: AtomicU64,
564}
565
566impl Default for OptimisticVersion {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572impl OptimisticVersion {
573 pub fn new() -> Self {
574 Self {
575 version: AtomicU64::new(0),
576 }
577 }
578
579 #[inline]
581 pub fn read_version(&self) -> u64 {
582 self.version.load(Ordering::Acquire)
583 }
584
585 #[inline]
587 pub fn is_stable(&self, version: u64) -> bool {
588 version & 1 == 0
589 }
590
591 #[inline]
593 pub fn validate(&self, read_version: u64) -> bool {
594 std::sync::atomic::fence(Ordering::Acquire);
596 self.version.load(Ordering::Relaxed) == read_version
597 }
598
599 pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
601 let current = self.version.load(Ordering::Acquire);
602
603 if !self.is_stable(current) {
605 return None;
606 }
607
608 match self.version.compare_exchange(
610 current,
611 current + 1,
612 Ordering::AcqRel,
613 Ordering::Relaxed,
614 ) {
615 Ok(_) => Some(WriteGuard {
616 version: &self.version,
617 start_version: current,
618 }),
619 Err(_) => None,
620 }
621 }
622}
623
624pub struct WriteGuard<'a> {
626 version: &'a AtomicU64,
627 start_version: u64,
628}
629
630impl<'a> WriteGuard<'a> {
631 pub fn commit(self) {
633 self.version
634 .store(self.start_version + 2, Ordering::Release);
635 std::mem::forget(self); }
637
638 pub fn abort(self) {
640 self.version.store(self.start_version, Ordering::Release);
641 std::mem::forget(self);
642 }
643}
644
645impl<'a> Drop for WriteGuard<'a> {
646 fn drop(&mut self) {
647 self.version.store(self.start_version, Ordering::Release);
649 }
650}
651
652pub struct EpochGuard {
654 manager: Arc<EpochManager>,
655 epoch: u64,
656}
657
658impl Drop for EpochGuard {
659 fn drop(&mut self) {
660 self.manager.leave_epoch(self.epoch);
661 }
662}
663
664pub struct EpochManager {
666 global_epoch: AtomicU64,
668 epoch_counts: [AtomicUsize; 4],
670 retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
672}
673
674impl Default for EpochManager {
675 fn default() -> Self {
676 Self::new()
677 }
678}
679
680impl EpochManager {
681 pub fn new() -> Self {
682 Self {
683 global_epoch: AtomicU64::new(0),
684 epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
685 retired: Mutex::new(Vec::new()),
686 }
687 }
688
689 pub fn pin(self: &Arc<Self>) -> EpochGuard {
691 let epoch = self.global_epoch.load(Ordering::Acquire);
692 self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
693
694 EpochGuard {
695 manager: self.clone(),
696 epoch,
697 }
698 }
699
700 fn leave_epoch(&self, epoch: u64) {
702 self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
703 }
704
705 pub fn advance(&self) {
707 let current = self.global_epoch.load(Ordering::Acquire);
708 let old_epoch = (current + 2) % 4; if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
712 self.global_epoch.fetch_add(1, Ordering::AcqRel);
713 self.reclaim(current.saturating_sub(2));
714 }
715 }
716
717 pub fn retire<T: Send + 'static>(&self, item: T) {
719 let epoch = self.global_epoch.load(Ordering::Acquire);
720 let mut retired = self.retired.lock();
721 retired.push((epoch, Box::new(item)));
722 }
723
724 fn reclaim(&self, safe_epoch: u64) {
726 let mut retired = self.retired.lock();
727 retired.retain(|(epoch, _)| *epoch > safe_epoch);
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734 use std::thread;
735
736 #[test]
737 fn test_intent_lock_compatibility() {
738 use IntentLock::*;
739
740 assert!(IntentShared.is_compatible(&IntentShared));
742 assert!(IntentShared.is_compatible(&IntentExclusive));
743 assert!(IntentShared.is_compatible(&Shared));
744 assert!(!IntentShared.is_compatible(&Exclusive));
745
746 assert!(IntentExclusive.is_compatible(&IntentShared));
748 assert!(IntentExclusive.is_compatible(&IntentExclusive));
749 assert!(!IntentExclusive.is_compatible(&Shared));
750 assert!(!IntentExclusive.is_compatible(&Exclusive));
751
752 assert!(Shared.is_compatible(&IntentShared));
754 assert!(!Shared.is_compatible(&IntentExclusive));
755 assert!(Shared.is_compatible(&Shared));
756 assert!(!Shared.is_compatible(&Exclusive));
757
758 assert!(!Exclusive.is_compatible(&IntentShared));
760 assert!(!Exclusive.is_compatible(&IntentExclusive));
761 assert!(!Exclusive.is_compatible(&Shared));
762 assert!(!Exclusive.is_compatible(&Exclusive));
763 }
764
765 #[test]
766 fn test_sharded_lock_table_basic() {
767 let table = ShardedLockTable::new();
768
769 assert_eq!(
771 table.try_lock(1, LockMode::Exclusive, 100),
772 LockResult::Acquired
773 );
774
775 assert_eq!(
777 table.try_lock(1, LockMode::Exclusive, 200),
778 LockResult::WouldBlock
779 );
780
781 assert_eq!(
783 table.try_lock(1, LockMode::Shared, 200),
784 LockResult::WouldBlock
785 );
786
787 assert_eq!(
789 table.try_lock(2, LockMode::Exclusive, 200),
790 LockResult::Acquired
791 );
792
793 assert!(table.unlock(1, 100));
795 assert_eq!(
796 table.try_lock(1, LockMode::Exclusive, 200),
797 LockResult::Acquired
798 );
799 }
800
801 #[test]
802 fn test_sharded_lock_table_shared() {
803 let table = ShardedLockTable::new();
804
805 assert_eq!(
807 table.try_lock(1, LockMode::Shared, 100),
808 LockResult::Acquired
809 );
810 assert_eq!(
811 table.try_lock(1, LockMode::Shared, 200),
812 LockResult::Acquired
813 );
814 assert_eq!(
815 table.try_lock(1, LockMode::Shared, 300),
816 LockResult::Acquired
817 );
818
819 assert_eq!(
821 table.try_lock(1, LockMode::Exclusive, 400),
822 LockResult::WouldBlock
823 );
824
825 assert!(table.unlock(1, 100));
827 assert!(table.unlock(1, 200));
828 assert!(table.unlock(1, 300));
829 assert_eq!(
830 table.try_lock(1, LockMode::Exclusive, 400),
831 LockResult::Acquired
832 );
833 }
834
835 #[test]
836 fn test_sharded_lock_upgrade() {
837 let table = ShardedLockTable::new();
838
839 assert_eq!(
841 table.try_lock(1, LockMode::Shared, 100),
842 LockResult::Acquired
843 );
844
845 assert_eq!(
847 table.try_lock(1, LockMode::Exclusive, 100),
848 LockResult::Acquired
849 );
850
851 assert_eq!(
853 table.try_lock(1, LockMode::Shared, 200),
854 LockResult::WouldBlock
855 );
856 }
857
858 #[test]
859 fn test_lock_manager_hierarchical() {
860 let manager = LockManager::new();
861
862 assert_eq!(
864 manager.lock_row(1, 100, LockMode::Exclusive, 1000),
865 LockResult::Acquired
866 );
867
868 assert_eq!(
870 manager.lock_row(1, 200, LockMode::Shared, 2000),
871 LockResult::Acquired
872 );
873
874 assert_eq!(
876 manager.lock_row(1, 100, LockMode::Exclusive, 2000),
877 LockResult::WouldBlock
878 );
879
880 let released = manager.release_all(1000);
882 assert!(released >= 1);
883 }
884
885 #[test]
886 fn test_optimistic_version() {
887 let version = OptimisticVersion::new();
888
889 let v = version.read_version();
891 assert!(version.is_stable(v));
892 assert!(version.validate(v));
893
894 {
896 let guard = version.try_write_begin().unwrap();
897 let v_during = version.read_version();
898 assert!(!version.is_stable(v_during)); guard.commit();
900 }
901
902 let v2 = version.read_version();
904 assert!(version.is_stable(v2));
905 assert_eq!(v2, 2);
906 }
907
908 #[test]
909 fn test_optimistic_concurrent() {
910 let version = Arc::new(OptimisticVersion::new());
911
912 let guard = version.try_write_begin().unwrap();
914
915 let version2 = version.clone();
917 let result = version2.try_write_begin();
918 assert!(result.is_none());
919
920 guard.commit();
922
923 let guard2 = version.try_write_begin().unwrap();
925 guard2.commit();
926 }
927
928 #[test]
929 fn test_epoch_manager() {
930 let manager = Arc::new(EpochManager::new());
931
932 let guard1 = manager.pin();
934 assert_eq!(guard1.epoch, 0);
935
936 manager.retire(vec![1, 2, 3]);
938
939 manager.advance();
941
942 let guard2 = manager.pin();
944 assert!(guard2.epoch >= guard1.epoch);
945
946 drop(guard1);
948 drop(guard2);
949
950 manager.advance();
952 manager.advance();
953 }
954
955 #[test]
956 fn test_sharded_distribution() {
957 let table = ShardedLockTable::new();
958
959 for i in 0..1000u128 {
961 assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
962 }
963
964 let mut non_empty_shards = 0;
966 for shard in &table.shards {
967 if !shard.lock().is_empty() {
968 non_empty_shards += 1;
969 }
970 }
971
972 assert!(
974 non_empty_shards > 100,
975 "Expected better distribution: {} shards used",
976 non_empty_shards
977 );
978 }
979
980 #[test]
981 fn test_unlock_all() {
982 let table = ShardedLockTable::new();
983
984 for i in 0..50u128 {
986 table.try_lock(i, LockMode::Exclusive, 100);
987 }
988
989 for i in 50..100u128 {
991 table.try_lock(i, LockMode::Exclusive, 200);
992 }
993
994 let released = table.unlock_all(100);
996 assert_eq!(released, 50);
997
998 assert_eq!(
1000 table.try_lock(50, LockMode::Exclusive, 300),
1001 LockResult::WouldBlock
1002 );
1003
1004 assert_eq!(
1006 table.try_lock(0, LockMode::Exclusive, 300),
1007 LockResult::Acquired
1008 );
1009 }
1010
1011 #[test]
1012 fn test_concurrent_locks() {
1013 let table = Arc::new(ShardedLockTable::new());
1014 let mut handles = vec![];
1015
1016 for txn_id in 0..16u64 {
1018 let table = table.clone();
1019 handles.push(thread::spawn(move || {
1020 let start = txn_id as u128 * 100;
1021 for i in 0..100 {
1022 let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
1023 assert_eq!(result, LockResult::Acquired);
1024 }
1025 }));
1026 }
1027
1028 for handle in handles {
1029 handle.join().unwrap();
1030 }
1031
1032 assert_eq!(
1034 table.stats().exclusive_acquired.load(Ordering::Relaxed),
1035 1600
1036 );
1037 }
1038}