1use dashmap::DashMap;
44use parking_lot::Mutex;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
48
49pub type TableId = u64;
51
52pub type RowId = u128;
54
55pub type TxnId = u64;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum IntentLock {
61 IntentShared,
63 IntentExclusive,
65 Shared,
67 Exclusive,
69}
70
71impl IntentLock {
72 pub fn is_compatible(&self, other: &IntentLock) -> bool {
74 use IntentLock::*;
75 matches!(
76 (self, other),
77 (IntentShared, IntentShared)
78 | (IntentShared, IntentExclusive)
79 | (IntentShared, Shared)
80 | (IntentExclusive, IntentShared)
81 | (IntentExclusive, IntentExclusive)
82 | (Shared, IntentShared)
83 | (Shared, Shared)
84 )
85 }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum LockMode {
91 Shared,
93 Exclusive,
95}
96
97impl LockMode {
98 pub fn is_compatible(&self, other: &LockMode) -> bool {
99 matches!((self, other), (LockMode::Shared, LockMode::Shared))
100 }
101}
102
103#[derive(Debug)]
105struct TableLockEntry {
106 mode: IntentLock,
107 holders: Vec<TxnId>,
108 #[allow(dead_code)]
109 waiters: Vec<(TxnId, IntentLock)>,
110}
111
112impl TableLockEntry {
113 fn new(mode: IntentLock, txn_id: TxnId) -> Self {
114 Self {
115 mode,
116 holders: vec![txn_id],
117 waiters: Vec::new(),
118 }
119 }
120}
121
122#[derive(Debug)]
124struct RowLockEntry {
125 mode: LockMode,
126 holders: Vec<TxnId>,
127}
128
129impl RowLockEntry {
130 fn new(mode: LockMode, txn_id: TxnId) -> Self {
131 Self {
132 mode,
133 holders: vec![txn_id],
134 }
135 }
136}
137
138pub struct ShardedLockTable {
141 shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
142 stats: LockTableStats,
143}
144
145impl Default for ShardedLockTable {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl ShardedLockTable {
152 pub fn new() -> Self {
154 Self {
155 shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
156 stats: LockTableStats::default(),
157 }
158 }
159
160 #[inline]
162 fn shard_index(&self, row_id: RowId) -> usize {
163 ((row_id >> 64) as usize ^ (row_id as usize)) % 256
165 }
166
167 pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
169 let shard_idx = self.shard_index(row_id);
170 let mut shard = self.shards[shard_idx].lock();
171
172 if let Some(entry) = shard.get_mut(&row_id) {
173 if entry.holders.contains(&txn_id) {
175 if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
177 if entry.holders.len() == 1 {
178 entry.mode = LockMode::Exclusive;
179 self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
180 return LockResult::Acquired;
181 } else {
182 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
183 return LockResult::WouldBlock;
184 }
185 }
186 return LockResult::AlreadyHeld;
187 }
188
189 if entry.mode.is_compatible(&mode) {
191 entry.holders.push(txn_id);
192 self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
193 return LockResult::Acquired;
194 }
195
196 self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
197 return LockResult::WouldBlock;
198 }
199
200 shard.insert(row_id, RowLockEntry::new(mode, txn_id));
202 match mode {
203 LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
204 LockMode::Exclusive => self
205 .stats
206 .exclusive_acquired
207 .fetch_add(1, Ordering::Relaxed),
208 };
209 LockResult::Acquired
210 }
211
212 pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
214 let shard_idx = self.shard_index(row_id);
215 let mut shard = self.shards[shard_idx].lock();
216
217 if let Some(entry) = shard.get_mut(&row_id)
218 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
219 {
220 entry.holders.remove(pos);
221 self.stats.released.fetch_add(1, Ordering::Relaxed);
222
223 if entry.holders.is_empty() {
224 shard.remove(&row_id);
225 }
226 return true;
227 }
228 false
229 }
230
231 pub fn unlock_all(&self, txn_id: TxnId) -> usize {
233 let mut count = 0;
234 for shard in &self.shards {
235 let mut shard_guard = shard.lock();
236 let to_remove: Vec<RowId> = shard_guard
237 .iter()
238 .filter(|(_, entry)| entry.holders.contains(&txn_id))
239 .map(|(&row_id, _)| row_id)
240 .collect();
241
242 for row_id in to_remove {
243 if let Some(entry) = shard_guard.get_mut(&row_id)
244 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
245 {
246 entry.holders.remove(pos);
247 count += 1;
248
249 if entry.holders.is_empty() {
250 shard_guard.remove(&row_id);
251 }
252 }
253 }
254 }
255 self.stats
256 .released
257 .fetch_add(count as u64, Ordering::Relaxed);
258 count
259 }
260
261 pub fn stats(&self) -> &LockTableStats {
263 &self.stats
264 }
265}
266
267#[derive(Debug, Default)]
269pub struct LockTableStats {
270 pub shared_acquired: AtomicU64,
271 pub exclusive_acquired: AtomicU64,
272 pub upgrades: AtomicU64,
273 pub conflicts: AtomicU64,
274 pub released: AtomicU64,
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
279pub enum LockResult {
280 Acquired,
282 AlreadyHeld,
284 WouldBlock,
286 Deadlock,
288}
289
290pub struct LockManager {
292 table_locks: DashMap<TableId, TableLockEntry>,
294 row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
296 epoch: AtomicU64,
298 stats: LockManagerStats,
300}
301
302impl Default for LockManager {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308impl LockManager {
309 pub fn new() -> Self {
311 Self {
312 table_locks: DashMap::new(),
313 row_locks: DashMap::new(),
314 epoch: AtomicU64::new(0),
315 stats: LockManagerStats::default(),
316 }
317 }
318
319 pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
321 use dashmap::mapref::entry::Entry;
322
323 match self.table_locks.entry(table_id) {
324 Entry::Vacant(vacant) => {
325 vacant.insert(TableLockEntry::new(mode, txn_id));
326 self.stats
327 .table_locks_acquired
328 .fetch_add(1, Ordering::Relaxed);
329 LockResult::Acquired
330 }
331 Entry::Occupied(mut occupied) => {
332 let entry = occupied.get_mut();
333
334 if entry.holders.contains(&txn_id) {
336 return LockResult::AlreadyHeld;
337 }
338
339 if entry.mode.is_compatible(&mode) {
341 entry.holders.push(txn_id);
342 self.stats
343 .table_locks_acquired
344 .fetch_add(1, Ordering::Relaxed);
345 return LockResult::Acquired;
346 }
347
348 self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
349 LockResult::WouldBlock
350 }
351 }
352 }
353
354 pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
356 if let Some(mut entry) = self.table_locks.get_mut(&table_id)
357 && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
358 {
359 entry.holders.remove(pos);
360 self.stats
361 .table_locks_released
362 .fetch_add(1, Ordering::Relaxed);
363
364 if entry.holders.is_empty() {
365 drop(entry);
366 self.table_locks.remove(&table_id);
367 }
368 return true;
369 }
370 false
371 }
372
373 fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
375 self.row_locks
376 .entry(table_id)
377 .or_insert_with(|| Arc::new(ShardedLockTable::new()))
378 .clone()
379 }
380
381 pub fn lock_row(
383 &self,
384 table_id: TableId,
385 row_id: RowId,
386 mode: LockMode,
387 txn_id: TxnId,
388 ) -> LockResult {
389 let intent_mode = match mode {
391 LockMode::Shared => IntentLock::IntentShared,
392 LockMode::Exclusive => IntentLock::IntentExclusive,
393 };
394
395 match self.lock_table(table_id, intent_mode, txn_id) {
396 LockResult::Acquired | LockResult::AlreadyHeld => {}
397 result => return result,
398 }
399
400 let row_locks = self.get_row_lock_table(table_id);
402 row_locks.try_lock(row_id, mode, txn_id)
403 }
404
405 pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
407 if let Some(row_locks) = self.row_locks.get(&table_id) {
408 return row_locks.unlock(row_id, txn_id);
409 }
410 false
411 }
412
413 pub fn release_all(&self, txn_id: TxnId) -> usize {
415 let mut count = 0;
416
417 for entry in self.row_locks.iter() {
419 count += entry.value().unlock_all(txn_id);
420 }
421
422 let table_ids: Vec<TableId> = self
424 .table_locks
425 .iter()
426 .filter(|e| e.value().holders.contains(&txn_id))
427 .map(|e| *e.key())
428 .collect();
429
430 for table_id in table_ids {
431 if self.unlock_table(table_id, txn_id) {
432 count += 1;
433 }
434 }
435
436 count
437 }
438
439 pub fn enter_epoch(&self) -> u64 {
441 self.epoch.fetch_add(1, Ordering::AcqRel)
442 }
443
444 pub fn current_epoch(&self) -> u64 {
446 self.epoch.load(Ordering::Acquire)
447 }
448
449 pub fn stats(&self) -> &LockManagerStats {
451 &self.stats
452 }
453}
454
455#[derive(Debug, Default)]
457pub struct LockManagerStats {
458 pub table_locks_acquired: AtomicU64,
459 pub table_locks_released: AtomicU64,
460 pub table_conflicts: AtomicU64,
461}
462
463pub struct OptimisticVersion {
466 version: AtomicU64,
468}
469
470impl Default for OptimisticVersion {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476impl OptimisticVersion {
477 pub fn new() -> Self {
478 Self {
479 version: AtomicU64::new(0),
480 }
481 }
482
483 #[inline]
485 pub fn read_version(&self) -> u64 {
486 self.version.load(Ordering::Acquire)
487 }
488
489 #[inline]
491 pub fn is_stable(&self, version: u64) -> bool {
492 version & 1 == 0
493 }
494
495 #[inline]
497 pub fn validate(&self, read_version: u64) -> bool {
498 std::sync::atomic::fence(Ordering::Acquire);
500 self.version.load(Ordering::Relaxed) == read_version
501 }
502
503 pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
505 let current = self.version.load(Ordering::Acquire);
506
507 if !self.is_stable(current) {
509 return None;
510 }
511
512 match self.version.compare_exchange(
514 current,
515 current + 1,
516 Ordering::AcqRel,
517 Ordering::Relaxed,
518 ) {
519 Ok(_) => Some(WriteGuard {
520 version: &self.version,
521 start_version: current,
522 }),
523 Err(_) => None,
524 }
525 }
526}
527
528pub struct WriteGuard<'a> {
530 version: &'a AtomicU64,
531 start_version: u64,
532}
533
534impl<'a> WriteGuard<'a> {
535 pub fn commit(self) {
537 self.version
538 .store(self.start_version + 2, Ordering::Release);
539 std::mem::forget(self); }
541
542 pub fn abort(self) {
544 self.version.store(self.start_version, Ordering::Release);
545 std::mem::forget(self);
546 }
547}
548
549impl<'a> Drop for WriteGuard<'a> {
550 fn drop(&mut self) {
551 self.version.store(self.start_version, Ordering::Release);
553 }
554}
555
556pub struct EpochGuard {
558 manager: Arc<EpochManager>,
559 epoch: u64,
560}
561
562impl Drop for EpochGuard {
563 fn drop(&mut self) {
564 self.manager.leave_epoch(self.epoch);
565 }
566}
567
568pub struct EpochManager {
570 global_epoch: AtomicU64,
572 epoch_counts: [AtomicUsize; 4],
574 retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
576}
577
578impl Default for EpochManager {
579 fn default() -> Self {
580 Self::new()
581 }
582}
583
584impl EpochManager {
585 pub fn new() -> Self {
586 Self {
587 global_epoch: AtomicU64::new(0),
588 epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
589 retired: Mutex::new(Vec::new()),
590 }
591 }
592
593 pub fn pin(self: &Arc<Self>) -> EpochGuard {
595 let epoch = self.global_epoch.load(Ordering::Acquire);
596 self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
597
598 EpochGuard {
599 manager: self.clone(),
600 epoch,
601 }
602 }
603
604 fn leave_epoch(&self, epoch: u64) {
606 self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
607 }
608
609 pub fn advance(&self) {
611 let current = self.global_epoch.load(Ordering::Acquire);
612 let old_epoch = (current + 2) % 4; if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
616 self.global_epoch.fetch_add(1, Ordering::AcqRel);
617 self.reclaim(current.saturating_sub(2));
618 }
619 }
620
621 pub fn retire<T: Send + 'static>(&self, item: T) {
623 let epoch = self.global_epoch.load(Ordering::Acquire);
624 let mut retired = self.retired.lock();
625 retired.push((epoch, Box::new(item)));
626 }
627
628 fn reclaim(&self, safe_epoch: u64) {
630 let mut retired = self.retired.lock();
631 retired.retain(|(epoch, _)| *epoch > safe_epoch);
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use std::thread;
639
640 #[test]
641 fn test_intent_lock_compatibility() {
642 use IntentLock::*;
643
644 assert!(IntentShared.is_compatible(&IntentShared));
646 assert!(IntentShared.is_compatible(&IntentExclusive));
647 assert!(IntentShared.is_compatible(&Shared));
648 assert!(!IntentShared.is_compatible(&Exclusive));
649
650 assert!(IntentExclusive.is_compatible(&IntentShared));
652 assert!(IntentExclusive.is_compatible(&IntentExclusive));
653 assert!(!IntentExclusive.is_compatible(&Shared));
654 assert!(!IntentExclusive.is_compatible(&Exclusive));
655
656 assert!(Shared.is_compatible(&IntentShared));
658 assert!(!Shared.is_compatible(&IntentExclusive));
659 assert!(Shared.is_compatible(&Shared));
660 assert!(!Shared.is_compatible(&Exclusive));
661
662 assert!(!Exclusive.is_compatible(&IntentShared));
664 assert!(!Exclusive.is_compatible(&IntentExclusive));
665 assert!(!Exclusive.is_compatible(&Shared));
666 assert!(!Exclusive.is_compatible(&Exclusive));
667 }
668
669 #[test]
670 fn test_sharded_lock_table_basic() {
671 let table = ShardedLockTable::new();
672
673 assert_eq!(
675 table.try_lock(1, LockMode::Exclusive, 100),
676 LockResult::Acquired
677 );
678
679 assert_eq!(
681 table.try_lock(1, LockMode::Exclusive, 200),
682 LockResult::WouldBlock
683 );
684
685 assert_eq!(
687 table.try_lock(1, LockMode::Shared, 200),
688 LockResult::WouldBlock
689 );
690
691 assert_eq!(
693 table.try_lock(2, LockMode::Exclusive, 200),
694 LockResult::Acquired
695 );
696
697 assert!(table.unlock(1, 100));
699 assert_eq!(
700 table.try_lock(1, LockMode::Exclusive, 200),
701 LockResult::Acquired
702 );
703 }
704
705 #[test]
706 fn test_sharded_lock_table_shared() {
707 let table = ShardedLockTable::new();
708
709 assert_eq!(
711 table.try_lock(1, LockMode::Shared, 100),
712 LockResult::Acquired
713 );
714 assert_eq!(
715 table.try_lock(1, LockMode::Shared, 200),
716 LockResult::Acquired
717 );
718 assert_eq!(
719 table.try_lock(1, LockMode::Shared, 300),
720 LockResult::Acquired
721 );
722
723 assert_eq!(
725 table.try_lock(1, LockMode::Exclusive, 400),
726 LockResult::WouldBlock
727 );
728
729 assert!(table.unlock(1, 100));
731 assert!(table.unlock(1, 200));
732 assert!(table.unlock(1, 300));
733 assert_eq!(
734 table.try_lock(1, LockMode::Exclusive, 400),
735 LockResult::Acquired
736 );
737 }
738
739 #[test]
740 fn test_sharded_lock_upgrade() {
741 let table = ShardedLockTable::new();
742
743 assert_eq!(
745 table.try_lock(1, LockMode::Shared, 100),
746 LockResult::Acquired
747 );
748
749 assert_eq!(
751 table.try_lock(1, LockMode::Exclusive, 100),
752 LockResult::Acquired
753 );
754
755 assert_eq!(
757 table.try_lock(1, LockMode::Shared, 200),
758 LockResult::WouldBlock
759 );
760 }
761
762 #[test]
763 fn test_lock_manager_hierarchical() {
764 let manager = LockManager::new();
765
766 assert_eq!(
768 manager.lock_row(1, 100, LockMode::Exclusive, 1000),
769 LockResult::Acquired
770 );
771
772 assert_eq!(
774 manager.lock_row(1, 200, LockMode::Shared, 2000),
775 LockResult::Acquired
776 );
777
778 assert_eq!(
780 manager.lock_row(1, 100, LockMode::Exclusive, 2000),
781 LockResult::WouldBlock
782 );
783
784 let released = manager.release_all(1000);
786 assert!(released >= 1);
787 }
788
789 #[test]
790 fn test_optimistic_version() {
791 let version = OptimisticVersion::new();
792
793 let v = version.read_version();
795 assert!(version.is_stable(v));
796 assert!(version.validate(v));
797
798 {
800 let guard = version.try_write_begin().unwrap();
801 let v_during = version.read_version();
802 assert!(!version.is_stable(v_during)); guard.commit();
804 }
805
806 let v2 = version.read_version();
808 assert!(version.is_stable(v2));
809 assert_eq!(v2, 2);
810 }
811
812 #[test]
813 fn test_optimistic_concurrent() {
814 let version = Arc::new(OptimisticVersion::new());
815
816 let guard = version.try_write_begin().unwrap();
818
819 let version2 = version.clone();
821 let result = version2.try_write_begin();
822 assert!(result.is_none());
823
824 guard.commit();
826
827 let guard2 = version.try_write_begin().unwrap();
829 guard2.commit();
830 }
831
832 #[test]
833 fn test_epoch_manager() {
834 let manager = Arc::new(EpochManager::new());
835
836 let guard1 = manager.pin();
838 assert_eq!(guard1.epoch, 0);
839
840 manager.retire(vec![1, 2, 3]);
842
843 manager.advance();
845
846 let guard2 = manager.pin();
848 assert!(guard2.epoch >= guard1.epoch);
849
850 drop(guard1);
852 drop(guard2);
853
854 manager.advance();
856 manager.advance();
857 }
858
859 #[test]
860 fn test_sharded_distribution() {
861 let table = ShardedLockTable::new();
862
863 for i in 0..1000u128 {
865 assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
866 }
867
868 let mut non_empty_shards = 0;
870 for shard in &table.shards {
871 if !shard.lock().is_empty() {
872 non_empty_shards += 1;
873 }
874 }
875
876 assert!(
878 non_empty_shards > 100,
879 "Expected better distribution: {} shards used",
880 non_empty_shards
881 );
882 }
883
884 #[test]
885 fn test_unlock_all() {
886 let table = ShardedLockTable::new();
887
888 for i in 0..50u128 {
890 table.try_lock(i, LockMode::Exclusive, 100);
891 }
892
893 for i in 50..100u128 {
895 table.try_lock(i, LockMode::Exclusive, 200);
896 }
897
898 let released = table.unlock_all(100);
900 assert_eq!(released, 50);
901
902 assert_eq!(
904 table.try_lock(50, LockMode::Exclusive, 300),
905 LockResult::WouldBlock
906 );
907
908 assert_eq!(
910 table.try_lock(0, LockMode::Exclusive, 300),
911 LockResult::Acquired
912 );
913 }
914
915 #[test]
916 fn test_concurrent_locks() {
917 let table = Arc::new(ShardedLockTable::new());
918 let mut handles = vec![];
919
920 for txn_id in 0..16u64 {
922 let table = table.clone();
923 handles.push(thread::spawn(move || {
924 let start = txn_id as u128 * 100;
925 for i in 0..100 {
926 let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
927 assert_eq!(result, LockResult::Acquired);
928 }
929 }));
930 }
931
932 for handle in handles {
933 handle.join().unwrap();
934 }
935
936 assert_eq!(
938 table.stats().exclusive_acquired.load(Ordering::Relaxed),
939 1600
940 );
941 }
942}