1use dashmap::DashMap;
41use parking_lot::Mutex;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
45
46pub type TableId = u64;
48
49pub type RowId = u128;
51
52pub type TxnId = u64;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum IntentLock {
58 IntentShared,
60 IntentExclusive,
62 Shared,
64 Exclusive,
66}
67
68impl IntentLock {
69 pub fn is_compatible(&self, other: &IntentLock) -> bool {
71 use IntentLock::*;
72 matches!(
73 (self, other),
74 (IntentShared, IntentShared)
75 | (IntentShared, IntentExclusive)
76 | (IntentShared, Shared)
77 | (IntentExclusive, IntentShared)
78 | (IntentExclusive, IntentExclusive)
79 | (Shared, IntentShared)
80 | (Shared, Shared)
81 )
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum LockMode {
88 Shared,
90 Exclusive,
92}
93
94impl LockMode {
95 pub fn is_compatible(&self, other: &LockMode) -> bool {
96 matches!((self, other), (LockMode::Shared, LockMode::Shared))
97 }
98}
99
100#[derive(Debug)]
102struct TableLockEntry {
103 mode: IntentLock,
104 holders: Vec<TxnId>,
105 #[allow(dead_code)]
106 waiters: Vec<(TxnId, IntentLock)>,
107}
108
109impl TableLockEntry {
110 fn new(mode: IntentLock, txn_id: TxnId) -> Self {
111 Self {
112 mode,
113 holders: vec![txn_id],
114 waiters: Vec::new(),
115 }
116 }
117}
118
119#[derive(Debug)]
121struct RowLockEntry {
122 mode: LockMode,
123 holders: Vec<TxnId>,
124}
125
126impl RowLockEntry {
127 fn new(mode: LockMode, txn_id: TxnId) -> Self {
128 Self {
129 mode,
130 holders: vec![txn_id],
131 }
132 }
133}
134
135pub struct ShardedLockTable {
138 shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
139 stats: LockTableStats,
140}
141
142impl Default for ShardedLockTable {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl ShardedLockTable {
149 pub fn new() -> Self {
151 Self {
152 shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
153 stats: LockTableStats::default(),
154 }
155 }
156
157 #[inline]
159 fn shard_index(&self, row_id: RowId) -> usize {
160 ((row_id >> 64) as usize ^ (row_id as usize)) % 256
162 }
163
164 pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
166 let shard_idx = self.shard_index(row_id);
167 let mut shard = self.shards[shard_idx].lock();
168
169 if let Some(entry) = shard.get_mut(&row_id) {
170 if entry.holders.contains(&txn_id) {
172 if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
174 if entry.holders.len() == 1 {
175 entry.mode = LockMode::Exclusive;
176 self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
177 return LockResult::Acquired;
178 } else {
179 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
180 return LockResult::WouldBlock;
181 }
182 }
183 return LockResult::AlreadyHeld;
184 }
185
186 if entry.mode.is_compatible(&mode) {
188 entry.holders.push(txn_id);
189 self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
190 return LockResult::Acquired;
191 }
192
193 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
194 return LockResult::WouldBlock;
195 }
196
197 shard.insert(row_id, RowLockEntry::new(mode, txn_id));
199 match mode {
200 LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
201 LockMode::Exclusive => self
202 .stats
203 .exclusive_acquired
204 .fetch_add(1, Ordering::Relaxed),
205 };
206 LockResult::Acquired
207 }
208
209 pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
211 let shard_idx = self.shard_index(row_id);
212 let mut shard = self.shards[shard_idx].lock();
213
214 if let Some(entry) = shard.get_mut(&row_id)
215 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
216 {
217 entry.holders.remove(pos);
218 self.stats.released.fetch_add(1, Ordering::Relaxed);
219
220 if entry.holders.is_empty() {
221 shard.remove(&row_id);
222 }
223 return true;
224 }
225 false
226 }
227
228 pub fn unlock_all(&self, txn_id: TxnId) -> usize {
230 let mut count = 0;
231 for shard in &self.shards {
232 let mut shard_guard = shard.lock();
233 let to_remove: Vec<RowId> = shard_guard
234 .iter()
235 .filter(|(_, entry)| entry.holders.contains(&txn_id))
236 .map(|(&row_id, _)| row_id)
237 .collect();
238
239 for row_id in to_remove {
240 if let Some(entry) = shard_guard.get_mut(&row_id)
241 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
242 {
243 entry.holders.remove(pos);
244 count += 1;
245
246 if entry.holders.is_empty() {
247 shard_guard.remove(&row_id);
248 }
249 }
250 }
251 }
252 self.stats
253 .released
254 .fetch_add(count as u64, Ordering::Relaxed);
255 count
256 }
257
258 pub fn stats(&self) -> &LockTableStats {
260 &self.stats
261 }
262}
263
264#[derive(Debug, Default)]
266pub struct LockTableStats {
267 pub shared_acquired: AtomicU64,
268 pub exclusive_acquired: AtomicU64,
269 pub upgrades: AtomicU64,
270 pub conflicts: AtomicU64,
271 pub released: AtomicU64,
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum LockResult {
277 Acquired,
279 AlreadyHeld,
281 WouldBlock,
283 Deadlock,
285}
286
287pub struct LockManager {
289 table_locks: DashMap<TableId, TableLockEntry>,
291 row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
293 epoch: AtomicU64,
295 stats: LockManagerStats,
297}
298
299impl Default for LockManager {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305impl LockManager {
306 pub fn new() -> Self {
308 Self {
309 table_locks: DashMap::new(),
310 row_locks: DashMap::new(),
311 epoch: AtomicU64::new(0),
312 stats: LockManagerStats::default(),
313 }
314 }
315
316 pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
318 use dashmap::mapref::entry::Entry;
319
320 match self.table_locks.entry(table_id) {
321 Entry::Vacant(vacant) => {
322 vacant.insert(TableLockEntry::new(mode, txn_id));
323 self.stats
324 .table_locks_acquired
325 .fetch_add(1, Ordering::Relaxed);
326 LockResult::Acquired
327 }
328 Entry::Occupied(mut occupied) => {
329 let entry = occupied.get_mut();
330
331 if entry.holders.contains(&txn_id) {
333 return LockResult::AlreadyHeld;
334 }
335
336 if entry.mode.is_compatible(&mode) {
338 entry.holders.push(txn_id);
339 self.stats
340 .table_locks_acquired
341 .fetch_add(1, Ordering::Relaxed);
342 return LockResult::Acquired;
343 }
344
345 self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
346 LockResult::WouldBlock
347 }
348 }
349 }
350
351 pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
353 if let Some(mut entry) = self.table_locks.get_mut(&table_id)
354 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
355 {
356 entry.holders.remove(pos);
357 self.stats
358 .table_locks_released
359 .fetch_add(1, Ordering::Relaxed);
360
361 if entry.holders.is_empty() {
362 drop(entry);
363 self.table_locks.remove(&table_id);
364 }
365 return true;
366 }
367 false
368 }
369
370 fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
372 self.row_locks
373 .entry(table_id)
374 .or_insert_with(|| Arc::new(ShardedLockTable::new()))
375 .clone()
376 }
377
378 pub fn lock_row(
380 &self,
381 table_id: TableId,
382 row_id: RowId,
383 mode: LockMode,
384 txn_id: TxnId,
385 ) -> LockResult {
386 let intent_mode = match mode {
388 LockMode::Shared => IntentLock::IntentShared,
389 LockMode::Exclusive => IntentLock::IntentExclusive,
390 };
391
392 match self.lock_table(table_id, intent_mode, txn_id) {
393 LockResult::Acquired | LockResult::AlreadyHeld => {}
394 result => return result,
395 }
396
397 let row_locks = self.get_row_lock_table(table_id);
399 row_locks.try_lock(row_id, mode, txn_id)
400 }
401
402 pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
404 if let Some(row_locks) = self.row_locks.get(&table_id) {
405 return row_locks.unlock(row_id, txn_id);
406 }
407 false
408 }
409
410 pub fn release_all(&self, txn_id: TxnId) -> usize {
412 let mut count = 0;
413
414 for entry in self.row_locks.iter() {
416 count += entry.value().unlock_all(txn_id);
417 }
418
419 let table_ids: Vec<TableId> = self
421 .table_locks
422 .iter()
423 .filter(|e| e.value().holders.contains(&txn_id))
424 .map(|e| *e.key())
425 .collect();
426
427 for table_id in table_ids {
428 if self.unlock_table(table_id, txn_id) {
429 count += 1;
430 }
431 }
432
433 count
434 }
435
436 pub fn enter_epoch(&self) -> u64 {
438 self.epoch.fetch_add(1, Ordering::AcqRel)
439 }
440
441 pub fn current_epoch(&self) -> u64 {
443 self.epoch.load(Ordering::Acquire)
444 }
445
446 pub fn stats(&self) -> &LockManagerStats {
448 &self.stats
449 }
450}
451
452#[derive(Debug, Default)]
454pub struct LockManagerStats {
455 pub table_locks_acquired: AtomicU64,
456 pub table_locks_released: AtomicU64,
457 pub table_conflicts: AtomicU64,
458}
459
460pub struct OptimisticVersion {
463 version: AtomicU64,
465}
466
467impl Default for OptimisticVersion {
468 fn default() -> Self {
469 Self::new()
470 }
471}
472
473impl OptimisticVersion {
474 pub fn new() -> Self {
475 Self {
476 version: AtomicU64::new(0),
477 }
478 }
479
480 #[inline]
482 pub fn read_version(&self) -> u64 {
483 self.version.load(Ordering::Acquire)
484 }
485
486 #[inline]
488 pub fn is_stable(&self, version: u64) -> bool {
489 version & 1 == 0
490 }
491
492 #[inline]
494 pub fn validate(&self, read_version: u64) -> bool {
495 std::sync::atomic::fence(Ordering::Acquire);
497 self.version.load(Ordering::Relaxed) == read_version
498 }
499
500 pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
502 let current = self.version.load(Ordering::Acquire);
503
504 if !self.is_stable(current) {
506 return None;
507 }
508
509 match self.version.compare_exchange(
511 current,
512 current + 1,
513 Ordering::AcqRel,
514 Ordering::Relaxed,
515 ) {
516 Ok(_) => Some(WriteGuard {
517 version: &self.version,
518 start_version: current,
519 }),
520 Err(_) => None,
521 }
522 }
523}
524
525pub struct WriteGuard<'a> {
527 version: &'a AtomicU64,
528 start_version: u64,
529}
530
531impl<'a> WriteGuard<'a> {
532 pub fn commit(self) {
534 self.version
535 .store(self.start_version + 2, Ordering::Release);
536 std::mem::forget(self); }
538
539 pub fn abort(self) {
541 self.version.store(self.start_version, Ordering::Release);
542 std::mem::forget(self);
543 }
544}
545
546impl<'a> Drop for WriteGuard<'a> {
547 fn drop(&mut self) {
548 self.version.store(self.start_version, Ordering::Release);
550 }
551}
552
553pub struct EpochGuard {
555 manager: Arc<EpochManager>,
556 epoch: u64,
557}
558
559impl Drop for EpochGuard {
560 fn drop(&mut self) {
561 self.manager.leave_epoch(self.epoch);
562 }
563}
564
565pub struct EpochManager {
567 global_epoch: AtomicU64,
569 epoch_counts: [AtomicUsize; 4],
571 retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
573}
574
575impl Default for EpochManager {
576 fn default() -> Self {
577 Self::new()
578 }
579}
580
581impl EpochManager {
582 pub fn new() -> Self {
583 Self {
584 global_epoch: AtomicU64::new(0),
585 epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
586 retired: Mutex::new(Vec::new()),
587 }
588 }
589
590 pub fn pin(self: &Arc<Self>) -> EpochGuard {
592 let epoch = self.global_epoch.load(Ordering::Acquire);
593 self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
594
595 EpochGuard {
596 manager: self.clone(),
597 epoch,
598 }
599 }
600
601 fn leave_epoch(&self, epoch: u64) {
603 self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
604 }
605
606 pub fn advance(&self) {
608 let current = self.global_epoch.load(Ordering::Acquire);
609 let old_epoch = (current + 2) % 4; if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
613 self.global_epoch.fetch_add(1, Ordering::AcqRel);
614 self.reclaim(current.saturating_sub(2));
615 }
616 }
617
618 pub fn retire<T: Send + 'static>(&self, item: T) {
620 let epoch = self.global_epoch.load(Ordering::Acquire);
621 let mut retired = self.retired.lock();
622 retired.push((epoch, Box::new(item)));
623 }
624
625 fn reclaim(&self, safe_epoch: u64) {
627 let mut retired = self.retired.lock();
628 retired.retain(|(epoch, _)| *epoch > safe_epoch);
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use std::thread;
636
637 #[test]
638 fn test_intent_lock_compatibility() {
639 use IntentLock::*;
640
641 assert!(IntentShared.is_compatible(&IntentShared));
643 assert!(IntentShared.is_compatible(&IntentExclusive));
644 assert!(IntentShared.is_compatible(&Shared));
645 assert!(!IntentShared.is_compatible(&Exclusive));
646
647 assert!(IntentExclusive.is_compatible(&IntentShared));
649 assert!(IntentExclusive.is_compatible(&IntentExclusive));
650 assert!(!IntentExclusive.is_compatible(&Shared));
651 assert!(!IntentExclusive.is_compatible(&Exclusive));
652
653 assert!(Shared.is_compatible(&IntentShared));
655 assert!(!Shared.is_compatible(&IntentExclusive));
656 assert!(Shared.is_compatible(&Shared));
657 assert!(!Shared.is_compatible(&Exclusive));
658
659 assert!(!Exclusive.is_compatible(&IntentShared));
661 assert!(!Exclusive.is_compatible(&IntentExclusive));
662 assert!(!Exclusive.is_compatible(&Shared));
663 assert!(!Exclusive.is_compatible(&Exclusive));
664 }
665
666 #[test]
667 fn test_sharded_lock_table_basic() {
668 let table = ShardedLockTable::new();
669
670 assert_eq!(
672 table.try_lock(1, LockMode::Exclusive, 100),
673 LockResult::Acquired
674 );
675
676 assert_eq!(
678 table.try_lock(1, LockMode::Exclusive, 200),
679 LockResult::WouldBlock
680 );
681
682 assert_eq!(
684 table.try_lock(1, LockMode::Shared, 200),
685 LockResult::WouldBlock
686 );
687
688 assert_eq!(
690 table.try_lock(2, LockMode::Exclusive, 200),
691 LockResult::Acquired
692 );
693
694 assert!(table.unlock(1, 100));
696 assert_eq!(
697 table.try_lock(1, LockMode::Exclusive, 200),
698 LockResult::Acquired
699 );
700 }
701
702 #[test]
703 fn test_sharded_lock_table_shared() {
704 let table = ShardedLockTable::new();
705
706 assert_eq!(
708 table.try_lock(1, LockMode::Shared, 100),
709 LockResult::Acquired
710 );
711 assert_eq!(
712 table.try_lock(1, LockMode::Shared, 200),
713 LockResult::Acquired
714 );
715 assert_eq!(
716 table.try_lock(1, LockMode::Shared, 300),
717 LockResult::Acquired
718 );
719
720 assert_eq!(
722 table.try_lock(1, LockMode::Exclusive, 400),
723 LockResult::WouldBlock
724 );
725
726 assert!(table.unlock(1, 100));
728 assert!(table.unlock(1, 200));
729 assert!(table.unlock(1, 300));
730 assert_eq!(
731 table.try_lock(1, LockMode::Exclusive, 400),
732 LockResult::Acquired
733 );
734 }
735
736 #[test]
737 fn test_sharded_lock_upgrade() {
738 let table = ShardedLockTable::new();
739
740 assert_eq!(
742 table.try_lock(1, LockMode::Shared, 100),
743 LockResult::Acquired
744 );
745
746 assert_eq!(
748 table.try_lock(1, LockMode::Exclusive, 100),
749 LockResult::Acquired
750 );
751
752 assert_eq!(
754 table.try_lock(1, LockMode::Shared, 200),
755 LockResult::WouldBlock
756 );
757 }
758
759 #[test]
760 fn test_lock_manager_hierarchical() {
761 let manager = LockManager::new();
762
763 assert_eq!(
765 manager.lock_row(1, 100, LockMode::Exclusive, 1000),
766 LockResult::Acquired
767 );
768
769 assert_eq!(
771 manager.lock_row(1, 200, LockMode::Shared, 2000),
772 LockResult::Acquired
773 );
774
775 assert_eq!(
777 manager.lock_row(1, 100, LockMode::Exclusive, 2000),
778 LockResult::WouldBlock
779 );
780
781 let released = manager.release_all(1000);
783 assert!(released >= 1);
784 }
785
786 #[test]
787 fn test_optimistic_version() {
788 let version = OptimisticVersion::new();
789
790 let v = version.read_version();
792 assert!(version.is_stable(v));
793 assert!(version.validate(v));
794
795 {
797 let guard = version.try_write_begin().unwrap();
798 let v_during = version.read_version();
799 assert!(!version.is_stable(v_during)); guard.commit();
801 }
802
803 let v2 = version.read_version();
805 assert!(version.is_stable(v2));
806 assert_eq!(v2, 2);
807 }
808
809 #[test]
810 fn test_optimistic_concurrent() {
811 let version = Arc::new(OptimisticVersion::new());
812
813 let guard = version.try_write_begin().unwrap();
815
816 let version2 = version.clone();
818 let result = version2.try_write_begin();
819 assert!(result.is_none());
820
821 guard.commit();
823
824 let guard2 = version.try_write_begin().unwrap();
826 guard2.commit();
827 }
828
829 #[test]
830 fn test_epoch_manager() {
831 let manager = Arc::new(EpochManager::new());
832
833 let guard1 = manager.pin();
835 assert_eq!(guard1.epoch, 0);
836
837 manager.retire(vec![1, 2, 3]);
839
840 manager.advance();
842
843 let guard2 = manager.pin();
845 assert!(guard2.epoch >= guard1.epoch);
846
847 drop(guard1);
849 drop(guard2);
850
851 manager.advance();
853 manager.advance();
854 }
855
856 #[test]
857 fn test_sharded_distribution() {
858 let table = ShardedLockTable::new();
859
860 for i in 0..1000u128 {
862 assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
863 }
864
865 let mut non_empty_shards = 0;
867 for shard in &table.shards {
868 if !shard.lock().is_empty() {
869 non_empty_shards += 1;
870 }
871 }
872
873 assert!(
875 non_empty_shards > 100,
876 "Expected better distribution: {} shards used",
877 non_empty_shards
878 );
879 }
880
881 #[test]
882 fn test_unlock_all() {
883 let table = ShardedLockTable::new();
884
885 for i in 0..50u128 {
887 table.try_lock(i, LockMode::Exclusive, 100);
888 }
889
890 for i in 50..100u128 {
892 table.try_lock(i, LockMode::Exclusive, 200);
893 }
894
895 let released = table.unlock_all(100);
897 assert_eq!(released, 50);
898
899 assert_eq!(
901 table.try_lock(50, LockMode::Exclusive, 300),
902 LockResult::WouldBlock
903 );
904
905 assert_eq!(
907 table.try_lock(0, LockMode::Exclusive, 300),
908 LockResult::Acquired
909 );
910 }
911
912 #[test]
913 fn test_concurrent_locks() {
914 let table = Arc::new(ShardedLockTable::new());
915 let mut handles = vec![];
916
917 for txn_id in 0..16u64 {
919 let table = table.clone();
920 handles.push(thread::spawn(move || {
921 let start = txn_id as u128 * 100;
922 for i in 0..100 {
923 let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
924 assert_eq!(result, LockResult::Acquired);
925 }
926 }));
927 }
928
929 for handle in handles {
930 handle.join().unwrap();
931 }
932
933 assert_eq!(
935 table.stats().exclusive_acquired.load(Ordering::Relaxed),
936 1600
937 );
938 }
939}